**----------程式說明:此程式用來將raw data合併並且只保留必要欄位以利後續分析----------**  
**1. 第一個程式區塊遍歷指定年份中所有壓縮檔並且解壓縮**  
**2. 第二個程式區塊將指定年份月分中csv檔案合併並且只保留需要欄位**  
**3. 第三個程式區塊將合併完的月度csv讀進程式合併成年度csv，並且匯出成若干parquet檔案**  

In [39]:
import pandas as pd 
import os  
import gzip
import shutil
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from tqdm import tqdm

**區塊1**

In [46]:
root_folder = "D:\\2019"  # change year here

for month_folder in os.listdir(root_folder):
    month_folder_path = os.path.join(root_folder, month_folder)
    
    if os.path.isdir(month_folder_path):
        
        for subfolder in os.listdir(month_folder_path):
            subfolder_path = os.path.join(month_folder_path, subfolder) # data_*_*_*
                
            if subfolder_path.endswith('.csv.gz'):
                output_file = os.path.splitext(subfolder_path)[0] 
                with gzip.open(subfolder_path, 'rb') as f_in:
                    with open(output_file, 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
                    print(f"\rUpzip: {output_file}", end="", flush=True)

Upzip: D:\2018\all_for_2018-12-01\data_9_7_0.csvv

**區塊2**

In [47]:
root_folder = "D:\\2018" 
merged_monthly_data = [] # used to merge every month data 

for month_folder in os.listdir(root_folder):
    month_folder_path = os.path.join(root_folder, month_folder)
    
    if os.path.isdir(month_folder_path):
        monthly_data = [] # store all the data in a specific month 
        
        for file in os.listdir(month_folder_path):
            file_path = os.path.join(month_folder_path, file)
            if file.endswith('.csv'):
                df = pd.read_csv(file_path, low_memory=False)
                df = df[['id', 'last_updated_date', 'last_updated_timestamp', 'duplicates', 'posted', 'expired', 'duration',
                         'title_name', 'title_clean', 'skills_name', 'specialized_skills_name', 'common_skills_name']]
                monthly_data.append(df)
        
        if monthly_data:
            monthly_merged_df = pd.concat(monthly_data, ignore_index=True)
            monthly_merged_file = os.path.join(root_folder, f"merged_{month_folder}.csv")
            monthly_merged_df.to_csv(monthly_merged_file, index=False)
            print(f"\rAlready merged {month_folder} data and saved as {monthly_merged_file}", end="", flush=True)

Already merged all_for_2018-12-01 data and saved as D:\2018\merged_all_for_2018-12-01.csv

**區塊3**

In [48]:
root_folder = "D:\\2018" 

monthly_csv_files = [
    os.path.join(root_folder, f)
    for f in os.listdir(root_folder)
    if f.startswith("merged_") and f.endswith(".csv")
]

print("Found CSV files to merge:")
for f in monthly_csv_files:
    print(f)

ddf = dd.read_csv(monthly_csv_files, assume_missing=True) 
output_folder = os.path.join(root_folder, "trimmed_2018_parquet")

with ProgressBar():
    ddf.to_parquet(output_folder)

print(f"Saved as: {output_folder}")

Found CSV files to merge:
D:\2018\merged_all_for_2018-01-01.csv
D:\2018\merged_all_for_2018-02-01.csv
D:\2018\merged_all_for_2018-03-01.csv
D:\2018\merged_all_for_2018-04-01.csv
D:\2018\merged_all_for_2018-05-01.csv
D:\2018\merged_all_for_2018-06-01.csv
D:\2018\merged_all_for_2018-07-01.csv
D:\2018\merged_all_for_2018-08-01.csv
D:\2018\merged_all_for_2018-09-01.csv
D:\2018\merged_all_for_2018-10-01.csv
D:\2018\merged_all_for_2018-11-01.csv
D:\2018\merged_all_for_2018-12-01.csv
[########################################] | 100% Completed | 10m 4ss
Saved as: D:\2018\trimmed_2018_parquet


**區塊4**

In [49]:
folder_path = "D:\\2018\\trimmed_2018_parquet"
all_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(".parquet")]
total_rows = 0
total_files = len(all_files)

for i, f in enumerate(all_files, start=1):
    df = pd.read_parquet(f)
    total_rows += len(df)
    print(f"\rProcessing file {i}/{total_files}: {os.path.basename(f)} | Total rows so far: {total_rows}", end="", flush=True)

print(f"\nFinished! Total rows: {total_rows}")

Processing file 301/301: part.99.parquet | Total rows so far: 331942402
Finished! Total rows: 33194240


In [50]:
year_folder = "D:\\2018"

for month_folder in os.listdir(year_folder):
    month_folder_path = os.path.join(year_folder, month_folder)
    
    if os.path.isdir(month_folder_path):
        for file in os.listdir(month_folder_path):
            file_path = os.path.join(month_folder_path, file)
            
            if os.path.isfile(file_path) and file.endswith(".csv"):
                os.remove(file_path)
                print(f"Deleted: {file_path}")

print("Done! All CSV files in 2018 have been deleted.")

Deleted: D:\2018\all_for_2018-01-01\data_0_0_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_0_1_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_0_2_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_0_3_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_0_4_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_0_5_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_0_6_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_0_7_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_0_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_1_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_2_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_3_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_4_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_5_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_6_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_10_7_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_11_0_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_11_1_0.csv
Deleted: D:\2018\all_for_2018-01-01\data_11_2_0.csv
Deleted: D:\2018\all