## 文件前提
* 左侧：data.csv,test.csv原始文件
* test_人工提取特征.csv,train_人工提取特征.csv是1.ipynb生成的
* 大数据基础model是在colab中预训练得到的model,用来提取及其特征
* 这个耗时很长

In [1]:
import numpy as np
import pandas as pd
import time
import cv2 
import torch
from torchvision import datasets,models,transforms,utils
from torch.utils.data import DataLoader,Dataset
from torchsummary import summary
import torch.nn as nn
import matplotlib.pyplot as plt
from PIL import Image
import os
import PIL
PIL.Image.MAX_IMAGE_PIXELS = 933120000
from pyspark.ml.linalg import DenseVector
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType,StructField,StructType,FloatType,IntegerType
from pyspark.sql.functions import udf
spark=SparkSession.builder.master("local").appName("大数据大作业-整合特征").getOrCreate()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
from pyspark.ml import feature as ft
# Enable Arrow support.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")
# 搭一个spark的环境
sc=spark.sparkContext
sc

## 特征一：原表格自带的特征+提前提取好的特征

In [2]:
def Read_df_raw(filename):
    """读取train和test原始文件"""
    df_data_raw=spark.read.csv(filename,header=True)
    def train():
        return "train"
    def test():
        return "test"
    train_udf=udf(train,StringType())
    test_udf=udf(test,StringType())
    if "data" in filename:# 给train加一列叫"train"
        df_data_raw=df_data_raw.withColumn("train_or_test",train_udf())
    else :
        df_data_raw=df_data_raw.withColumn("train_or_test",test_udf())
    print("读取原始文件完成：")
    df_data_raw.show(5)
    return df_data_raw
def Read_human(filename):
    """读取人工提取的特征"""
    data_human=pd.read_csv(filename,index_col=0)
    data_human.index=[i for i in range(0,len(data_human.index))]
    df_data_human=spark.createDataFrame(data_human.values.tolist(),data_human.columns.tolist())
    print("读取人工提取特征文件完成：")
    df_data_human.printSchema()
    return df_data_human

In [3]:
df_data_raw=Read_df_raw("data.csv")
df_data_human=Read_human("train_人工提取特征.csv")

读取原始文件完成：
+----------+----------------+-----+------------------+--------+--------+-------------+--------------------+-------------+
|product_id|product_category|brand|             price|quantity|favorite|negative_info|           image_url|train_or_test|
+----------+----------------+-----+------------------+--------+--------+-------------+--------------------+-------------+
|         1|               2|    2|127778.66305461951|       1|    1056|            0|https://lh3.googl...|        train|
|         2|               1|    1|           3250.34|       1|       2|            1|https://lh3.googl...|        train|
|         3|               1|    1| 977.9385000000001|       1|       2|            1|https://lh3.googl...|        train|
|         4|               1|    1| 977.9385000000001|       1|       2|            1|https://lh3.googl...|        train|
|         5|               1|    1|1776.1040000000003|       1|       0|            0|https://lh3.googl...|        train|
+----------+--

In [4]:
# 从pytorch导入模型
class AdaptiveConcatPool2d(nn.Module):
    def __init__(self,sz=None):
        super().__init__()
        sz=sz or (1,1)
        self.ap=nn.AdaptiveAvgPool2d(sz)
        self.mp=nn.AdaptiveMaxPool2d(sz)
    def forward(self,x):
        return torch.cat([self.mp(x),self.ap(x)],1)
def get_model():
    model=torch.load("大数据技术基础model", map_location=torch.device('cpu'))
    #对最后一层进行社会主义改造，把它变成输出512个元素的乖宝宝
    new_fc=nn.Sequential(
            nn.Flatten(),
            nn.BatchNorm1d(4096),
            nn.Dropout(0.5),
            nn.Linear(4096,512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.5)
        )
    state=model.fc.state_dict()
    state.pop("7.weight")
    state.pop("7.bias")
    new_fc.load_state_dict(state)
    model.fc=new_fc
    print("模型已装载...")
    return model
enumerate
class ImageDataset(Dataset):
    def __init__(self,paths,transform=None) -> None:
        super().__init__()
        self.paths=paths
        self.transform=transform
    def __len__(self):
        return len(self.paths)
    def __getitem__(self, index):
        image = datasets.folder.default_loader(self.paths[index])
        if self.transform is not None:
            image = self.transform(image)
        return image

datadir = 'E:/python/大数据技术基础/大作业/img/mydata/'
traindir = datadir + 'train/'
testdir = datadir + 'test/'
def get_paths(dir_name):
    paths=[]
    for i in os.listdir(dir_name):
        if os.path.isdir(i):
            continue
        else:
            paths.append(dir_name+i)
    return paths

train_paths=get_paths(traindir)
test_paths=get_paths(testdir)

In [5]:

# 定义预测的函数
transform=transforms.Compose([
    transforms.CenterCrop(size=256),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])       
])
def predict(train_or_test,product_id):
    """这是一个 udf，
    输入是train还是test的路径，输入product_id，输出上面那个深度神经网络提取到的特征"""
    model.eval()
    if train_or_test=="train":
        path=traindir+product_id+".jpg"
    else:
        path=testdir+product_id+".jpg"
    if not os.path.exists(path):
        res=np.array([None]*512).reshape(512,).tolist()
        res.append(product_id)
        return res
    image = datasets.folder.default_loader(path)
    image=transform(image)
    image=torch.reshape(image,[1,3,256,256])
    model.to(device)
    with torch.no_grad():
        predictions=model(image.to(device)).cpu().numpy().reshape(512,).tolist()
        predictions.append(product_id)
    return predictions
schema=StructType([])
for  i in range(0,512):
    schema.add(StructField(str(i+1),FloatType()))
schema.add(StructField("product_id",StringType()))
predict_udf=udf(predict,schema)#转换成udf

model=get_model()#fbm:feature_by_machine

模型已装载...


In [6]:
#把模型训练一下
df_data_machine=df_data_raw.withColumn("fbm",predict_udf("train_or_test","product_id")).select("fbm").rdd.flatMap(lambda x:x).toDF(schema).dropna(thresh=2)

In [7]:
# len_pca=50# 假定这个世界是光滑的
# def Get_feature_pca(df_machine):
#     def pca_to_array(product_id,pca_features):
#         """把pca_features的列返回为一个np.array()"""
#         res=pca_features.toArray().tolist()#rdd.flatMap(lambda x:x).collect()[0].toArray().tolist()
#         res.append(str(product_id))
#         return res
#     schema_pca=StructType([])
#     for  i in range(0,len_pca):
#         schema_pca.add(StructField(str(i+1),FloatType()))
#     schema_pca.add(StructField("product_id",StringType()))
#     pca_to_array_udf=udf(pca_to_array,schema_pca)#转换成udf
    
#     df_data_machine_features=ft.VectorAssembler(inputCols=df_machine.columns[:-1],outputCol="features")
#     df_data_machine_features=df_data_machine_features.transform(df_machine).select("product_id","features").collect()
#     df_data_machine_features=spark.createDataFrame(df_data_machine_features)    
    
#     pca_model=ft.PCA(k=len_pca,inputCol="features",outputCol="pca_features")
#     pca_model=pca_model.fit(df_data_machine_features)
#     pca_features=pca_model.transform(df_data_machine_features).collect()
#     pca_features=spark.createDataFrame(pca_features).select("product_id","pca_features")
    
#     pca_features=pca_features.withColumn("pca_features",pca_to_array_udf("product_id","pca_features")).select("pca_features").rdd.flatMap(lambda x:x).toDF(schema_pca)
#     print("对机器学习的结果进行PCA操作完成：")
#     pca_features.printSchema()
#     return (pca_model,pca_features)

In [8]:
# plt.style.use("seaborn")
# plt.rcParams['font.sans-serif']=['SimHei']   # 用黑体显示中文
# plt.rcParams['axes.unicode_minus']=False     # 正常显示负号
# plt.plot(range(0,len_pca),pca_model.explainedVariance.toArray(),"co-")
# plt.xlabel("主成分排名")
# plt.ylabel("主成分重要性")
# plt.title("主成分重要程度")

In [9]:
df_data_human=df_data_human.withColumn("product_id",df_data_human["product_id"].cast(IntegerType()).cast(StringType()))

In [10]:
df_all_features=df_data_raw.select("product_id","product_category","brand","price","quantity","favorite","negative_info").join(df_data_human,df_data_raw.product_id==df_data_human.product_id,how="left_outer").drop(df_data_human.product_id)
df_all_features=df_all_features.join(df_data_machine,df_all_features.product_id==df_data_machine.product_id,how="left_outer").drop(df_data_machine.product_id)
df_all_features.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- favorite: string (nullable = true)
 |-- negative_info: string (nullable = true)
 |-- coner_numbers: double (nullable = true)
 |-- face_numbers: double (nullable = true)
 |-- face_x: double (nullable = true)
 |-- faces_y: double (nullable = true)
 |-- face_size: double (nullable = true)
 |-- keypoint: double (nullable = true)
 |-- contour_numbers: double (nullable = true)
 |-- h1: double (nullable = true)
 |-- h2: double (nullable = true)
 |-- h3: double (nullable = true)
 |-- s1: double (nullable = true)
 |-- s2: double (nullable = true)
 |-- s3: double (nullable = true)
 |-- v1: double (nullable = true)
 |-- v2: double (nullable = true)
 |-- v3: double (nullable = true)
 |-- text_regions: double (nullable = true)
 |-- 1: float (nullable = true)
 |-- 2: float (nullable = true

In [11]:
all_features=df_all_features.toPandas()

In [12]:
all_features.head()

Unnamed: 0,product_id,product_category,brand,price,quantity,favorite,negative_info,coner_numbers,face_numbers,face_x,...,503,504,505,506,507,508,509,510,511,512
0,1090,1,3,171.394,1,0,0,2949.0,0.0,0.0,...,-0.745802,-0.765462,-0.679552,-0.746695,-0.473874,-0.256671,2.044357,-0.069252,0.055398,-0.578162
1,1159,1,3,98.3835,1,186,0,488.0,0.0,0.0,...,-0.745802,-0.765462,-0.679552,-0.746695,-0.558266,-0.540052,-0.770851,-0.778373,-0.706384,-0.578162
2,1436,1,3,36.3296,1,0,0,695.0,0.0,0.0,...,-0.745802,-0.765462,-0.117326,1.016579,-0.635949,-0.540052,-0.770851,-0.778373,-0.706384,0.395351
3,1512,1,5,20.985,1,0,0,14067.0,0.0,0.0,...,-0.171421,5.02014,1.230139,-0.353823,-0.635949,-0.540052,-0.770851,-0.778373,-0.706384,1.207916
4,1572,1,3,168.205,1,0,0,3079.0,0.0,0.0,...,-0.745802,-0.765462,-0.679552,-0.746695,-0.635949,0.691969,1.457456,0.827745,-0.382041,-0.384171


In [13]:
all_features.to_csv("train_all_features_noPCA.csv",index=None)

## test整合特征部分

In [14]:
df_test_raw=Read_df_raw("test.csv")

读取原始文件完成：
+----------+----------------+-----+------------------+--------+--------+-------------+--------------------+-------------+
|product_id|product_category|brand|             price|quantity|favorite|negative_info|           image_url|train_or_test|
+----------+----------------+-----+------------------+--------+--------+-------------+--------------------+-------------+
|      8001|               3|    4|          75.41336|       1|       0|            0|https://lh3.googl...|         test|
|      8002|               3|    4|3.1391099999999996|       1|       2|            0|https://lh3.googl...|         test|
|      8003|               3|    4|252.78960000000004|       1|      38|            0|https://lh3.googl...|         test|
|      8004|               3|    4|            0.7841|       1|       0|            2|https://lh3.googl...|         test|
|      8005|               3|    4|           1.44331|       1|       0|            0|https://lh3.googl...|         test|
+----------+--

In [15]:
df_test_human=Read_human("test_人工提取特征.csv")

读取人工提取特征文件完成：
root
 |-- product_id: double (nullable = true)
 |-- coner_numbers: double (nullable = true)
 |-- face_numbers: double (nullable = true)
 |-- face_x: double (nullable = true)
 |-- faces_y: double (nullable = true)
 |-- face_size: double (nullable = true)
 |-- keypoint: double (nullable = true)
 |-- contour_numbers: double (nullable = true)
 |-- h1: double (nullable = true)
 |-- h2: double (nullable = true)
 |-- h3: double (nullable = true)
 |-- s1: double (nullable = true)
 |-- s2: double (nullable = true)
 |-- s3: double (nullable = true)
 |-- v1: double (nullable = true)
 |-- v2: double (nullable = true)
 |-- v3: double (nullable = true)
 |-- text_regions: double (nullable = true)



In [16]:
#简单测试一下能不能用
df_test_machine=df_test_raw.withColumn("fbm",predict_udf("train_or_test","product_id")).select("fbm").rdd.flatMap(lambda x:x).toDF(schema).dropna(thresh=2)
df_test_machine.printSchema()

root
 |-- 1: float (nullable = true)
 |-- 2: float (nullable = true)
 |-- 3: float (nullable = true)
 |-- 4: float (nullable = true)
 |-- 5: float (nullable = true)
 |-- 6: float (nullable = true)
 |-- 7: float (nullable = true)
 |-- 8: float (nullable = true)
 |-- 9: float (nullable = true)
 |-- 10: float (nullable = true)
 |-- 11: float (nullable = true)
 |-- 12: float (nullable = true)
 |-- 13: float (nullable = true)
 |-- 14: float (nullable = true)
 |-- 15: float (nullable = true)
 |-- 16: float (nullable = true)
 |-- 17: float (nullable = true)
 |-- 18: float (nullable = true)
 |-- 19: float (nullable = true)
 |-- 20: float (nullable = true)
 |-- 21: float (nullable = true)
 |-- 22: float (nullable = true)
 |-- 23: float (nullable = true)
 |-- 24: float (nullable = true)
 |-- 25: float (nullable = true)
 |-- 26: float (nullable = true)
 |-- 27: float (nullable = true)
 |-- 28: float (nullable = true)
 |-- 29: float (nullable = true)
 |-- 30: float (nullable = true)
 |-- 31: float

In [17]:
df_test_human=df_test_human.withColumn("product_id",df_test_human["product_id"].cast(IntegerType()).cast(StringType()))
df_all_features=df_test_raw.select("product_id","product_category","brand","price","quantity","favorite","negative_info").join(df_test_human,df_test_raw.product_id==df_test_human.product_id,how="left_outer").drop(df_test_human.product_id)
df_all_features=df_all_features.join(df_test_machine,df_all_features.product_id==df_test_machine.product_id,how="left_outer").drop(df_test_machine.product_id)
print("整合之后所有特征的结构如下：")
df_all_features.printSchema()

整合之后所有特征的结构如下：
root
 |-- product_id: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- favorite: string (nullable = true)
 |-- negative_info: string (nullable = true)
 |-- coner_numbers: double (nullable = true)
 |-- face_numbers: double (nullable = true)
 |-- face_x: double (nullable = true)
 |-- faces_y: double (nullable = true)
 |-- face_size: double (nullable = true)
 |-- keypoint: double (nullable = true)
 |-- contour_numbers: double (nullable = true)
 |-- h1: double (nullable = true)
 |-- h2: double (nullable = true)
 |-- h3: double (nullable = true)
 |-- s1: double (nullable = true)
 |-- s2: double (nullable = true)
 |-- s3: double (nullable = true)
 |-- v1: double (nullable = true)
 |-- v2: double (nullable = true)
 |-- v3: double (nullable = true)
 |-- text_regions: double (nullable = true)
 |-- 1: float (nullable = true)
 |-- 2: float (

In [18]:
all_features=df_all_features.toPandas()

In [19]:
all_features.head()

Unnamed: 0,product_id,product_category,brand,price,quantity,favorite,negative_info,coner_numbers,face_numbers,face_x,...,503,504,505,506,507,508,509,510,511,512
0,8304,3,4,77.61599999999999,1,0,0,2515.0,0.0,0.0,...,-0.543722,-0.765462,-0.679552,1.663159,0.574418,0.3597,-0.445884,0.340315,-0.706384,-0.11844
1,8433,2,2,1150.564,1,1128,0,24562.0,0.0,0.0,...,2.081845,-0.353054,-0.57963,0.592419,-0.576733,0.879988,-0.733702,2.297908,2.227531,-0.578162
2,9009,2,2,1372.444,1,1081,0,10672.0,0.0,0.0,...,1.350494,-0.765462,-0.29823,-0.746695,-0.635949,-0.540052,-0.35677,1.967704,-0.706384,2.291177
3,9030,2,2,4849.025,1,981,0,24149.0,0.0,0.0,...,1.153007,0.917282,0.307953,-0.746695,0.178981,1.236456,0.168556,-0.567529,-0.706384,0.167084
4,9583,1,3,53.5182,1,1,0,33272.0,0.0,0.0,...,0.438044,-0.765462,-0.679552,-0.746695,-0.635949,-0.00731,-0.699182,-0.636476,1.409035,-0.578162


In [20]:
all_features.to_csv("test_all_features_noPCA.csv",index=None)