In [1]:
import os
print("当前目录文件:")
print(os.listdir('.'))

# 检查CSV文件是否存在
if 'Walmart.csv' in os.listdir('.'):
    print("✅ Walmart.csv 上传成功!")
else:
    print("❌ 未找到Walmart.csv文件")

当前目录文件:
['Walmart.csv', '.ipynb_checkpoints', 'Untitled.ipynb', '.virtual_documents']
✅ Walmart.csv 上传成功!


In [4]:
# 在DSW中运行这个检查
from odps import ODPS
import pandas as pd

# 连接MaxCompute
try:
    odps = ODPS.from_environments()
    print(f"✅ 连接成功，项目: {odps.project}")
except:
    print("❌ 连接失败")

# 检查表是否存在和数据
try:
    # 检查表
    table = odps.get_table('walmart_sales_raw')
    print(f"表 walmart_sales_raw 存在")
    print(f"表结构: {[col.name for col in table.schema.columns]}")
    
    # 检查数据量
    count_sql = "SELECT COUNT(*) as cnt FROM walmart_sales_raw"
    result = odps.execute_sql(count_sql)
    record_count = list(result)[0].cnt
    print(f"表中记录数: {record_count}")
    
    if record_count == 0:
        print("❌ 表中没有数据")
    else:
        print("✅ 表中有数据")
        
except Exception as e:
    print(f"检查表时出错: {e}")

❌ 连接失败
检查表时出错: 'NoneType' object has no attribute 'get_table'


In [12]:
# 修复数据格式问题的上传代码

import pandas as pd
from odps import ODPS
import numpy as np

print("🔧 修复数据格式并重新上传...")

# 使用已建立的连接
print("使用现有连接:", odps.project)

# 读取CSV数据
print("\n📁 重新读取和处理CSV数据...")
try:
    df = pd.read_csv('Walmart.csv')
    print(f"原始数据形状: {df.shape}")
    print(f"原始列名: {list(df.columns)}")
    print("前3行数据:")
    print(df.head(3))
    
except Exception as e:
    print(f"❌ 读取CSV失败: {e}")
    raise

# 数据清洗和格式转换
print("\n🧹 详细数据清洗...")

# 重命名列
df.columns = ['store', 'date_str', 'weekly_sales', 'holiday_flag', 
              'temperature', 'fuel_price', 'cpi', 'unemployment']

print("重命名后的列:", list(df.columns))

# 数据类型检查和转换
print("\n📊 数据类型处理...")
print("处理前的数据类型:")
print(df.dtypes)

# 逐列处理数据类型
df_processed = pd.DataFrame()

# 处理store列 - 转换为整数
try:
    df_processed['store'] = pd.to_numeric(df['store'], errors='coerce').astype('int64')
    print("✅ store列处理完成")
except Exception as e:
    print(f"❌ store列处理失败: {e}")

# 处理date_str列 - 确保为字符串
try:
    df_processed['date_str'] = df['date_str'].astype(str)
    print("✅ date_str列处理完成")
except Exception as e:
    print(f"❌ date_str列处理失败: {e}")

# 处理weekly_sales列 - 转换为浮点数
try:
    df_processed['weekly_sales'] = pd.to_numeric(df['weekly_sales'], errors='coerce').astype('float64')
    print("✅ weekly_sales列处理完成")
except Exception as e:
    print(f"❌ weekly_sales列处理失败: {e}")

# 处理holiday_flag列 - 转换为整数
try:
    df_processed['holiday_flag'] = pd.to_numeric(df['holiday_flag'], errors='coerce').astype('int64')
    print("✅ holiday_flag列处理完成")
except Exception as e:
    print(f"❌ holiday_flag列处理失败: {e}")

# 处理temperature列 - 转换为浮点数
try:
    df_processed['temperature'] = pd.to_numeric(df['temperature'], errors='coerce').astype('float64')
    print("✅ temperature列处理完成")
except Exception as e:
    print(f"❌ temperature列处理失败: {e}")

# 处理fuel_price列 - 转换为浮点数
try:
    df_processed['fuel_price'] = pd.to_numeric(df['fuel_price'], errors='coerce').astype('float64')
    print("✅ fuel_price列处理完成")
except Exception as e:
    print(f"❌ fuel_price列处理失败: {e}")

# 处理cpi列 - 转换为浮点数
try:
    df_processed['cpi'] = pd.to_numeric(df['cpi'], errors='coerce').astype('float64')
    print("✅ cpi列处理完成")
except Exception as e:
    print(f"❌ cpi列处理失败: {e}")

# 处理unemployment列 - 转换为浮点数
try:
    df_processed['unemployment'] = pd.to_numeric(df['unemployment'], errors='coerce').astype('float64')
    print("✅ unemployment列处理完成")
except Exception as e:
    print(f"❌ unemployment列处理失败: {e}")

print("\n处理后的数据类型:")
print(df_processed.dtypes)

# 检查和处理缺失值
print("\n🔍 检查缺失值...")
missing_counts = df_processed.isnull().sum()
print("缺失值统计:")
print(missing_counts)

# 删除有缺失值的行
df_clean = df_processed.dropna()
print(f"\n清洗后数据: {len(df_processed)} → {len(df_clean)} 行")

print("\n清洗后的样本数据:")
print(df_clean.head(3))

# 检查MaxCompute表结构
print("\n📋 检查MaxCompute表结构...")
try:
    table = odps.get_table('walmart_sales_raw')
    print("表结构信息:")
    for col in table.table_schema.columns:
        print(f"  {col.name}: {col.type}")
except Exception as e:
    print(f"获取表结构失败: {e}")

# 准备上传数据 - 转换为适合MaxCompute的格式
print("\n🔄 准备上传格式...")

def prepare_data_for_maxcompute(df):
    """准备适合MaxCompute的数据格式"""
    prepared_data = []
    
    for index, row in df.iterrows():
        # 将每行转换为tuple格式，这是MaxCompute writer期望的格式
        record = (
            int(row['store']),                    # BIGINT
            str(row['date_str']),                 # STRING
            float(row['weekly_sales']),           # DOUBLE
            int(row['holiday_flag']),             # BIGINT
            float(row['temperature']),            # DOUBLE
            float(row['fuel_price']),             # DOUBLE
            float(row['cpi']),                    # DOUBLE
            float(row['unemployment'])            # DOUBLE
        )
        prepared_data.append(record)
        
        # 每1000行显示一次进度
        if (index + 1) % 1000 == 0:
            print(f"准备进度: {index + 1}/{len(df)}")
    
    return prepared_data

# 准备数据
print("开始准备数据...")
prepared_records = prepare_data_for_maxcompute(df_clean)
print(f"✅ 准备完成，共 {len(prepared_records)} 条记录")

print("前3条准备好的记录:")
for i, record in enumerate(prepared_records[:3]):
    print(f"  记录 {i+1}: {record}")

# 上传数据
def upload_prepared_data(odps, records):
    """上传准备好的数据"""
    print("\n⬆️ 开始上传准备好的数据...")
    
    try:
        # 清空表
        print("清空现有数据...")
        truncate_sql = "TRUNCATE TABLE walmart_sales_raw"
        truncate_instance = odps.execute_sql(truncate_sql)
        truncate_instance.wait_for_success()
        print("✅ 表已清空")
        
        # 获取表对象
        table = odps.get_table('walmart_sales_raw')
        
        # 批量上传
        batch_size = 1000
        total_records = len(records)
        
        print(f"开始批量上传，总计 {total_records} 条记录")
        
        with table.open_writer() as writer:
            for i in range(0, total_records, batch_size):
                batch = records[i:i+batch_size]
                
                # 写入批次数据
                for record in batch:
                    writer.write(record)
                
                progress = ((i + len(batch)) / total_records) * 100
                print(f"批次 {i//batch_size + 1}: {len(batch)} 条记录 | 进度: {progress:.1f}%")
        
        print("✅ 数据写入完成!")
        return True
        
    except Exception as e:
        print(f"❌ 上传失败: {e}")
        import traceback
        traceback.print_exc()
        return False

# 执行上传
success = upload_prepared_data(odps, prepared_records)

# 验证结果
if success:
    print("\n🔍 验证上传结果...")
    
    def safe_execute_sql(odps, sql):
        """安全执行SQL"""
        try:
            instance = odps.execute_sql(sql)
            instance.wait_for_success()
            with instance.open_reader() as reader:
                return list(reader)
        except Exception as e:
            print(f"SQL执行失败: {e}")
            return None
    
    # 检查记录数
    count_results = safe_execute_sql(odps, "SELECT COUNT(*) as cnt FROM walmart_sales_raw")
    if count_results:
        final_count = count_results[0][0]
        print(f"✅ 最终记录数: {final_count}")
    
    # 查看样本数据
    sample_results = safe_execute_sql(odps, "SELECT * FROM walmart_sales_raw LIMIT 5")
    if sample_results:
        print("\n📋 MaxCompute中的样本数据:")
        for i, row in enumerate(sample_results):
            print(f"  行 {i+1}: 门店={row[0]}, 日期={row[1]}, 销量={row[2]}")
    
    print("\n🎉 数据上传完全成功!")
    print("现在可以在DataWorks中查询数据了!")
    
else:
    print("\n❌ 数据上传失败，请检查错误信息")

🔧 修复数据格式并重新上传...
使用现有连接: ds_case_demo

📁 重新读取和处理CSV数据...
原始数据形状: (6435, 8)
原始列名: ['Store', 'Date', 'Weekly_Sales', 'Holiday_Flag', 'Temperature', 'Fuel_Price', 'CPI', 'Unemployment']
前3行数据:
   Store        Date  Weekly_Sales  Holiday_Flag  Temperature  Fuel_Price  \
0      1  05-02-2010    1643690.90             0        42.31       2.572   
1      1  12-02-2010    1641957.44             1        38.51       2.548   
2      1  19-02-2010    1611968.17             0        39.93       2.514   

          CPI  Unemployment  
0  211.096358         8.106  
1  211.242170         8.106  
2  211.289143         8.106  

🧹 详细数据清洗...
重命名后的列: ['store', 'date_str', 'weekly_sales', 'holiday_flag', 'temperature', 'fuel_price', 'cpi', 'unemployment']

📊 数据类型处理...
处理前的数据类型:
store             int64
date_str         object
weekly_sales    float64
holiday_flag      int64
temperature     float64
fuel_price      float64
cpi             float64
unemployment    float64
dtype: object
✅ store列处理完成
✅ date_str列处