In [None]:
import os
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime

# --- 1. 配置信息 ---
PASSWORD = 'w20050718' 
DB_NAME = 'COMP30830_SW'
FOLDER_PATH = '/Users/alex/Documents/COMP30830_Software_Engineering/comp30380_software/data/dublinbike_status' 

# --- 2. 建立连接 ---
engine = create_engine(f"mysql+pymysql://root:{PASSWORD}@localhost:3306/{DB_NAME}")

# --- 3. 批量读取与合并 ---
all_files = [f for f in os.listdir(FOLDER_PATH) if f.endswith('.json')]
data_list = []

print(f"发现 {len(all_files)} 个 JSON 文件，开始读取...")

for filename in all_files:
    file_path = os.path.join(FOLDER_PATH, filename)
    try:
        # 读取 JSON
        df = pd.read_json(file_path)
        
        # 处理 snapshot_time：从文件名提取并转换为 DATETIME 格式
        # 格式：20260203T221449Z -> 2026-02-03 22:14:49
        snapshot_str = filename.split('_')[2].split('.')[0]
        snapshot_datetime = datetime.strptime(snapshot_str, '%Y%m%dT%H%M%SZ')
        df['snapshot_time'] = snapshot_datetime
        
        # 处理 position 列：从字典中提取 lat 和 lng
        if 'position' in df.columns and df['position'].dtype == 'object':
            df['lat'] = df['position'].apply(lambda x: x.get('lat') if isinstance(x, dict) else None)
            df['lng'] = df['position'].apply(lambda x: x.get('lng') if isinstance(x, dict) else None)
            df = df.drop('position', axis=1)
        
        # 删除无效的 banking 和 bonus 列
        df = df.drop(['banking', 'bonus'], axis=1, errors='ignore')
        
        data_list.append(df)
    except Exception as e:
        print(f"读取失败 {filename}: {e}")

# --- 4. 合并所有数据 ---
if data_list:
    final_df = pd.concat(data_list, ignore_index=True)
    print(f"✅ 数据读取完成，共 {len(final_df)} 行，{len(final_df.columns)} 列")
else:
    print("❌ 文件夹内未发现有效的 JSON 文件")

In [28]:
# --- 5. 数据类型转换 ---
print("=== 数据类型转换开始 ===")

# 将指定列转换为 category 类型，节省内存
category_cols = ['contract_name', 'name', 'address', 'status']
for col in category_cols:
    if col in final_df.columns:
        final_df[col] = final_df[col].astype('category')
print(f"✅ {len(category_cols)} 列转换为 category")

# 将 last_update 从毫秒级Unix时间戳转换为 DATETIME 格式
if 'last_update' in final_df.columns:
    final_df['last_update'] = pd.to_datetime(final_df['last_update'], unit='ms')
print(f"✅ last_update 转换为 DATETIME")

# 将 snapshot_time 转换为 DATETIME
if 'snapshot_time' in final_df.columns:
    final_df['snapshot_time'] = pd.to_datetime(final_df['snapshot_time'])
print(f"✅ snapshot_time 转换为 DATETIME")

# --- 6. 写入数据库 ---
print(f"\n=== 写入数据库开始 ===")
print(f"总计 {len(final_df)} 行数据，{len(final_df.columns)} 列")

final_df.to_sql('dublin_bike_data', con=engine, if_exists='replace', index=False)

print("✅ 批量导入成功！")
print(f"\n=== 最终数据类型 ===")
print(final_df.dtypes)

=== 数据类型转换开始 ===
✅ 4 列转换为 category
✅ last_update 转换为 DATETIME
✅ snapshot_time 转换为 DATETIME

=== 写入数据库开始 ===
总计 66120 行数据，12 列
✅ 批量导入成功！

=== 最终数据类型 ===
number                            int64
contract_name                  category
name                           category
address                        category
bike_stands                       int64
available_bike_stands             int64
available_bikes                   int64
status                         category
last_update              datetime64[ns]
snapshot_time            datetime64[us]
lat                             float64
lng                             float64
dtype: object


In [32]:
!mysql -u root -p

Enter password: 