In [2]:
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile('C:\\Users\\admin\\Downloads\\30G_data\\part-00000.parquet')
data = parquet_file.read().to_pandas()

# data.iloc[0]
print(data.dtypes)

id                     int64
timestamp             object
user_name             object
chinese_name          object
email                 object
age                    int64
income               float64
gender                object
country               object
chinese_address       object
purchase_history      object
is_active               bool
registration_date     object
credit_score           int64
phone_number          object
dtype: object


In [4]:
is_active = data[data['is_active']==False]
len(is_active)

18750000

In [1]:
data.iloc[0:5]

NameError: name 'data' is not defined

In [7]:
import dask.dataframe as dd
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import time
import glob
import json
import pyarrow
from dask.diagnostics import ProgressBar
from dask.distributed import Client, LocalCluster

In [None]:
plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置中文字体
# 启用进度条
ProgressBar().register()

In [None]:
# 读取数据
def read_multiple_parquet(pattern):
    """读取并优化数据格式"""
    print(f"开始读取文件: {pattern}")
    start = time.time()
    
    files = glob.glob(pattern)
    print(f"发现 {len(files)} 个数据文件")
    
    # 列类型映射（优化内存使用）
    type_map = {
        'id': 'category',
        'user_name': 'category',
        'chinese_name': 'category',
        'email': 'category',
        'gender': 'category',
        'country': 'category',
        'is_active': 'bool',
        'timestamp': 'datetime64[ns]',
        'registration_date': 'datetime64[ns]',
        'age': 'int8',
        'income': 'float32',
        'credit_score': 'int16'
    }
    
    ddf = dd.read_parquet(
        files,
        engine='pyarrow',
        dtype=type_map,
        parse_dates=['timestamp', 'registration_date'],
        # blocksize="256MB"  # 控制每个分区的大小
    )
    ddf['timestamp'] = dd.to_datetime(ddf['timestamp'], format='%Y-%m-%dT%H:%M:%S%z')
    ddf['registration_date'] = dd.to_datetime(ddf['registration_date'], format='%Y-%m-%d')
    # print(ddf.dtypes)
    # 处理中文地址的特殊字符
    ddf['chinese_address'] = ddf['chinese_address'].astype('string')
    
    print(f"数据读取完成，耗时：{time.time()-start:.2f}秒")
    return ddf

datasets = {
    "30GB_data": "C:/Users/admin/Downloads/30G_data/*.parquet"
}

for ds_name, pattern in datasets.items():
    print(f"\n{'='*40}")
    print(f"开始读取数据集: {ds_name}")
    print(f"{'='*40}")
    
    ds_report = {}
    start_time = time.time()
    

    read_start = time.time()
    ddf = read_multiple_parquet(pattern)
    ds_report['read_time'] = time.time() - read_start
    print(f"数据读取耗时: {ds_report['read_time']:.2f}秒")
    print(f"数据集大小: {ddf.shape[0].compute()} 行, {ddf.shape[1]} 列")



开始读取数据集: 10GB_data
开始读取文件: C:/Users/admin/Downloads/10G_data/*.parquet
发现 8 个数据文件
数据读取完成，耗时：0.01秒
数据读取耗时: 0.01秒
[########################################] | 100% Completed | 107.91 ms
数据集大小: 100000000 行, 15 列


In [16]:
def enhanced_analysis(ddf, dataset_name):
    """执行数据集探索性分析"""
    start = time.time()
    results = {}
    
    # 基础统计
    print("\n[数值型字段统计]")
    num_stats = ddf[['age', 'income', 'credit_score']].describe().compute()
    print(num_stats)
    results['numeric_stats'] = num_stats.to_dict()
    
    # 缺失值分析
    missing = ddf.isna().sum().compute()
    total = len(ddf)
    missing_pct = (missing / total * 100).round(2)
    missing_df = pd.DataFrame({'缺失数量': missing, '缺失比例(%)': missing_pct})
    
    # 可视化缺失值
    plt.figure(figsize=(12,6))
    missing_df['缺失比例(%)'].sort_values().plot(kind='barh', color='skyblue')
    plt.title(f'{dataset_name} - 字段缺失值分布',fontfamily='SimHei')
    plt.xlabel('缺失比例(%)',fontfamily='SimHei')
    plt.ylabel('字段名称',fontfamily='SimHei')
    plt.savefig(f'{dataset_name}_missing_values.png', bbox_inches='tight')
    plt.close()
    
    # 用户年龄分布
    plt.figure(figsize=(10,6))
    ddf['age'].compute().plot(kind='hist', bins=50, alpha=0.7)
    plt.title(f'{dataset_name} - 用户年龄分布',fontfamily='SimHei')
    plt.xlabel('年龄', fontfamily='SimHei')
    plt.ylabel('用户数量', fontfamily='SimHei')
    plt.savefig(f'{dataset_name}_age_dist.png')
    plt.close()
    
    # 用户活跃度分析
    if 'registration_date' in ddf.columns:
        ddf['reg_year'] = ddf['registration_date'].dt.year
        reg_dist = ddf['reg_year'].value_counts().compute().sort_index()
        
        plt.figure(figsize=(10,6))
        reg_dist.plot(kind='bar', color='teal')
        plt.title(f'{dataset_name} - 用户注册年份分布',fontfamily='SimHei')
        plt.xlabel('注册年份', fontfamily='SimHei')
        plt.ylabel('用户数量', fontfamily='SimHei')
        plt.savefig(f'{dataset_name}_reg_year.png')
        plt.close()
    
    print(f"探索性分析完成，耗时：{time.time()-start:.2f}秒")
    return results
    
for ds_name, pattern in datasets.items():
    analysis_results = enhanced_analysis(ddf, ds_name)
    ds_report.update(analysis_results)


[数值型字段统计]
[########################################] | 100% Completed | 4.76 ss
                age        income  credit_score
count  1.000000e+08  1.000000e+08  1.000000e+08
mean   5.901673e+01  4.993547e+05  5.751113e+02
std    2.395049e+01  2.890191e+05  1.590030e+02
min    1.800000e+01  0.000000e+00  3.000000e+02
25%    3.800000e+01  2.520000e+05  4.380000e+02
50%    5.900000e+01  5.020000e+05  5.760000e+02
75%    8.000000e+01  7.520000e+05  7.140000e+02
max    1.000000e+02  1.000000e+06  8.500000e+02
[########################################] | 100% Completed | 159.42 s
[########################################] | 100% Completed | 110.02 ms
[                                        ] | 0% Completed | 278.10 us

  plt.savefig(f'{dataset_name}_missing_values.png', bbox_inches='tight')


[########################################] | 100% Completed | 1.04 sms
[########################################] | 100% Completed | 32.56 s
探索性分析完成，耗时：202.67秒


In [17]:
data.isnull().any()

id                   False
timestamp            False
user_name            False
chinese_name         False
email                False
age                  False
income               False
gender               False
country              False
chinese_address      False
purchase_history     False
is_active            False
registration_date    False
credit_score         False
phone_number         False
dtype: bool

In [18]:
# 数据预处理
def enhanced_preprocessing(ddf, dataset_name):
    """数据清洗流程"""
    start = time.time()
    original_count = len(ddf)
    results = {}
    
    # 处理缺失值
    missing = ddf.isna().sum().compute()
    missing_cols = missing[missing > 0].index.tolist()
    
    # 删除高缺失率字段（>40%）
    high_missing = missing[(missing/original_count) > 0.4].index
    ddf = ddf.drop(columns=high_missing)
    results['dropped_columns'] = high_missing.tolist()
    
    # 处理年龄异常值
    age_filter = (ddf['age'] >= 18) & (ddf['age'] <= 100)
    ddf = ddf[age_filter]
    
    # 处理收入异常值（IQR方法）
    if 'income' in ddf.columns:
        q = ddf['income'].quantile([0.25, 0.75]).compute()
        iqr = q[0.75] - q[0.25]
        income_filter = (ddf['income'] >= (q[0.25] - 1.5*iqr)) & (ddf['income'] <= (q[0.75] + 1.5*iqr))
        ddf = ddf[income_filter]
    
    # 处理信用评分
    if 'credit_score' in ddf.columns:
        ddf = ddf[(ddf['credit_score'] >= 300) & (ddf['credit_score'] <= 850)]
    
    # 保存预处理结果
    processed_count = len(ddf)
    results['original_count'] = original_count
    results['processed_count'] = processed_count
    results['processing_time'] = time.time() - start
    
    print(f"数据预处理完成，耗时：{results['processing_time']:.2f}秒")
    print(f"数据量变化：{original_count} -> {processed_count}")
    
    return ddf, results

for ds_name, pattern in datasets.items():
    preprocess_start = time.time()
    cleaned_ddf, preprocess_results = enhanced_preprocessing(ddf, ds_name)
    ds_report.update(preprocess_results)
    ds_report['preprocess_time'] = time.time() - preprocess_start

[########################################] | 100% Completed | 107.08 ms
[########################################] | 100% Completed | 162.83 s
[########################################] | 100% Completed | 1.95 ss
[########################################] | 100% Completed | 157.96 s
数据预处理完成，耗时：323.01秒
数据量变化：100000000 -> 100000000


In [27]:

isactive = ddf[ddf['is_active']==False]
len(isactive)

[########################################] | 100% Completed | 154.37 s


100000000

In [29]:
# 识别潜在高价值用户
def high_value_user_analysis(ddf):
    """识别高价值用户"""
    start = time.time()
    high_value_users = ddf[
        (ddf['income'] >= 750000) & 
        (ddf['credit_score'] >= 700)
    ]

    # 展示结果（可选）
    print(f"发现 {len(high_value_users)} 个潜在高价值用户:")
    print(high_value_users)
    process_time = time.time() - start
    print(f"高价值用户分析耗时: {process_time:.2f}秒")
    return high_value_users

for ds_name, pattern in datasets.items():
    preprocess_start = time.time()
    high_value_users = high_value_user_analysis(ddf)
    ds_report['preprocess_time'] = time.time() - preprocess_start

[########################################] | 100% Completed | 154.77 s
发现 6840250 个潜在高价值用户:
Dask DataFrame Structure:
                     id       timestamp user_name chinese_name   email      age   income  gender country chinese_address purchase_history is_active registration_date credit_score phone_number reg_year
npartitions=96                                                                                                                                                                                          
                float64  datetime64[ns]    string       string  string  float64  float64  string  string          string           string    object    datetime64[ns]      float64       string    int32
                    ...             ...       ...          ...     ...      ...      ...     ...     ...             ...              ...       ...               ...          ...          ...      ...
...                 ...             ...       ...          ...     ...      ..