In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import avg, count, countDistinct, max, min, mean, variance, stddev, sum, skewness, kurtosis
from pyspark.sql.functions import hour,minute,second,year,month,dayofmonth,date_format
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm
from time import sleep
import csv
from fitter import Fitter
import os
import shutil

In [2]:
spark = SparkSession.builder.\
    appName("local[*]").\
    getOrCreate()

22/04/15 15:36:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [116]:
class Solve:

    def __init__(self, path):
        
        """---------------------读入、去除价格为NULL、转换字段类型、建表、填充空值------------------------"""
        
        self.df06 = spark.read.option("encoding","gb18030").csv(path,sep='\t',header=True)
        self.df06 =  self.df06.filter(self.df06['ITEM_PRICE'] != 'null')
        self.df06 = self.df06.withColumn("ITEM_PRICE",self.df06.ITEM_PRICE.astype("int"))
        self.df06 = self.df06.withColumn("ITEM_SALES_VOLUME",self.df06.ITEM_SALES_VOLUME.astype("int"))
        self.df06 = self.df06.withColumn("TOTAL_EVAL_NUM",self.df06.TOTAL_EVAL_NUM.astype("int"))
        self.df06 = self.df06.withColumn("ITEM_STOCK",self.df06.ITEM_STOCK.astype("int"))
        self.df06 = self.df06.withColumn("ITEM_FAV_NUM",self.df06.ITEM_STOCK.astype("int"))
        self.df06.createOrReplaceTempView("data06")
        self.df06 = self.df06 = self.df06.na.fill('Unkown')
        self.df06 = self.df06.withColumn('CATE_NAME_LV', F.concat(self.df06['CATE_NAME_LV1'],self.df06['CATE_NAME_LV2'],self.df06['CATE_NAME_LV3'],self.df06['CATE_NAME_LV4'],self.df06['CATE_NAME_LV5']))
        
        """---------------------合并lv、添加INDEX（参数、得到文件路径------------------------"""
        
        @udf(returnType=StringType())
        def replace_(a):
            return str(a).replace(' ', '')
        self.df06 = self.df06.withColumn("CATE_NAME_LV", replace_(self.df06["CATE_NAME_LV"]))
        
        @udf(returnType=StringType())
        def plus_one(a):
            tmp = str(a).replace("：", ":").replace("；", ";")
            ans = ''
            for i in tmp.split(";"):
                ans += ' ' + i.split(":")[0]
            return ans
        self.df06 = self.df06.withColumn("INDEX", plus_one(self.df06["ITEM_PARAM"]))
        self.df06 = self.df06.withColumn('C_I', F.concat(self.df06['CATE_NAME_LV'],self.df06['INDEX']))
        self.path_l, self.path_n = os.path.split(path)
        self.name, self.cs = os.path.splitext(self.path_n)
        
    def Cal_Li(self):
        print("Cal_Li")
        """---------------------将lv和price合并并保存------------------------"""
        
        @udf(returnType=StringType())
        def add_(a):
            return str(a)+'$_$'
        self.df06 = self.df06.withColumn("C_P", add_(self.df06["CATE_NAME_LV"]))
        
        self.df06 = self.df06.withColumn('C_P', F.concat(self.df06['C_P'],self.df06['ITEM_PRICE']))
        self.df06.createOrReplaceTempView("data06")
        tmp = spark.sql('select C_P from data06')
        try:
            tmp.repartition(1).write.option("encoding","gb18030").csv('{}/{}_C_P'.format(self.path_l, self.name))
        except:
            pass
        
        """---------------------保存的文件中读取并记录每个lv的每个商品价格(self.Li)------------------------"""
        
        cou_dir, can_dir = {}, {}
        tmp_path = '{}/{}_C_P'.format(self.path_l, self.name)
        def solve():
            for i in os.listdir(tmp_path):
                if os.path.splitext(os.path.join(tmp_path,i))[1] == '.csv':
                    return os.path.join(tmp_path,i)
        filepath = solve()
        self.Li = {}
        with open(filepath, encoding='gb18030') as f:
            reader = csv.reader(f)
            header = next(reader)
            for row in tqdm(reader):
                tmp = row[0].split('$_$')
                lv = tmp[0]
                if len(tmp) < 2 or len(tmp) > 3:
                    continue
                if lv not in self.Li:
                    self.Li[lv] = [int(tmp[1])]
                else:
                    self.Li[lv].append(int(tmp[1]))

    def Deal_C_P(self):
        print("Deal_C_P")
        ll_Thr = 2
        """---------------------计算没个lv的变异系数------------------------"""
        
        ll, ss, cc = {}, [], 0
        cnt = 0
        for i in self.Li:
            if(np.mean(self.Li[i]) != 0):
                x = np.std(self.Li[i])/np.mean(self.Li[i])
                ll[i] = x
            else:
                ll[i] = 0
            if(ll[i] < ll_Thr):
                cnt += 1
        print(cnt)
        """---------------------先对数正态分布拟合再正太分布拟合，并取出阈值------------------------"""
        for i in tqdm(self.Li):
            self.Li[i] = np.sort(self.Li[i])
            
        lv_can, lv_lr = {}, {}
        for  i in tqdm((self.Li)):
            if(ll[i] < ll_Thr):
                continue
            x = self.Li[i]
            l = len(self.Li[i])//100
            r = len(self.Li[i])*99//100+1
            y = x[l: r]
            f = Fitter(y, distributions=['lognorm'])
            f.fit()
            lv_can[i] = (f.fitted_param['lognorm'][1], f.fitted_param['lognorm'][2])
            
            yy = np.log((y-lv_can[i][0])/lv_can[i][1])
            f = Fitter(yy, distributions=['norm'])
            f.fit()
            tmp = f.fitted_param['norm'][0] + 3*f.fitted_param['norm'][1]
            lv_lr[i] = np.exp(tmp)*lv_can[i][1]+lv_can[i][0]
            
        """---------------------根据阈值确定自定义字段flag1（异常）是否为1------------------------"""
        @udf(returnType=StringType())
        def cal_flag1(a):
            tmp = str(a).split('$_$')
            try:
                if len(tmp) != 2:
                    return str(1)
                if(tmp[0] not in lv_lr):
                    return str(0)
                if(int(tmp[1]) > lv_lr[tmp[0]]):
                    return str(1)
                else:
                    return str(0)
            except:
                return "11"
        self.df06 = self.df06.withColumn("flag1", cal_flag1(self.df06["C_P"]))
        
         
    def cal_Density(self):
        print("cal_Density")
        p_rating = {}
        pr_r = {}
        ard = {}
        M = 10
        for i in tqdm(self.Li):
            p_rating[i] = {}
            if len(self.Li[i]) < M:
                continue
            N = len(self.Li[i])//10
            l, r = np.min(self.Li[i]), np.max(self.Li[i])
            cou = np.zeros(3*r+10)
            fac = np.zeros(3*r+10)
            faci = np.zeros(3*r+10)
            sum_ans = np.zeros(3*r+10)
            for j in self.Li[i]:
                cou[j+r] += 1
            for j in range(1, 3*r):
                fac[j] = cou[j] + fac[j-1]
                faci[j] = faci[j-1] + cou[j]*j
            p_rating[i] = {}
            for j in np.unique(self.Li[i]):
                d = j + r
                ll , rr = 0, r+1
                while ll < rr:
                    mid = (ll + rr) // 2
                    sum = fac[d+mid] - fac[d-mid-1]-1
                    if sum >= N:
                        rr = mid
                    else:
                        ll = mid+1
                sum = fac[d+ll] - fac[d-ll-1] - 1
                sum_1 = fac[d+ll-1] - fac[d-ll] - 1
                ans = 0
                if sum == N:
                    ans += faci[d+ll] - faci[d] - d * (fac[d+ll]-fac[d])
                    ans += d * (fac[d-1]-fac[d-ll-1]) - (faci[d-1] - faci[d-ll-1])
                else:
                    ans += (N-sum_1) * (ll)
                    ans += faci[d+ll-1] - faci[d] - d * (fac[d+ll-1]-fac[d])
                    ans += d * (fac[d-1]-fac[d-ll]) - (faci[d-1] - faci[d-ll])
                p_rating[i][j] = ans/N
            
        @udf(returnType=StringType())
        def cal_rating(a):
            tmp = a.split('$_$')
            if tmp[0] in p_rating and int(tmp[1]) in p_rating[tmp[0]]:
                return str(p_rating[tmp[0]][int(tmp[1])])
            return '-1'
        self.df06 = self.df06.withColumn("rating", cal_rating(self.df06["C_P"]))

        lvmx_rating = {}
        for i in p_rating:
            for j in p_rating[i]:
                lvmx_rating[i] = np.max([lvmx_rating.get(i, 0), int(p_rating[i][j])])

        @udf(returnType=StringType())
        def add_(a):
            return str(a)+'$_$'
        self.df06 = self.df06.withColumn("C_R", add_(self.df06["CATE_NAME_LV"]))
        self.df06 = self.df06.withColumn('C_R', F.concat(self.df06['C_R'],self.df06['rating']))   
        
        @udf(returnType=StringType())
        def cal_flag2(a):
            tmp = a.split('$_$')
            if tmp[0] not in lvmx_rating:
                return '0'
            else:
                if len(str(int(lvmx_rating[tmp[0]]))) == len(tmp[1].split('.')[0]):
                    return '1'
                else:
                    return '0'
        self.df06 = self.df06.withColumn("flag2", cal_flag2(self.df06["C_R"]))

In [120]:
%%time
sum = 0
for i in range(6, 10):
    path = '../data_20210{}.tsv'.format(i)
    a = Solve(path)
    a.Cal_Li()
    a.Deal_C_P()
    a.cal_Density()
    a.df06.createOrReplaceTempView("data06")
    tmp = spark.sql('select *  from data06 where flag1 = "1" and flag2 == "1"')
    x = tmp.count()
    print(x)
    sum += x
    if not os.path.exists('./only_flag2_{}'.format(i)):
        tmp.repartition(1).write.option("encoding","gb18030").csv('./only_flag2_{}'.format(i))
print(sum)

Cal_Li


0it [00:00, ?it/s]

Deal_C_P
9961


  0%|          | 0/12460 [00:00<?, ?it/s]

  0%|          | 0/12460 [00:00<?, ?it/s]

cal_Density


  0%|          | 0/12460 [00:00<?, ?it/s]

22/04/15 20:57:04 WARN DAGScheduler: Broadcasting large task binary with size 1074.0 KiB
                                                                                

11407


22/04/15 20:57:55 WARN DAGScheduler: Broadcasting large task binary with size 1114.8 KiB
                                                                                

Cal_Li


0it [00:00, ?it/s]

Deal_C_P
10026


  0%|          | 0/12357 [00:00<?, ?it/s]

  0%|          | 0/12357 [00:00<?, ?it/s]

cal_Density


  0%|          | 0/12357 [00:00<?, ?it/s]

22/04/15 21:04:32 WARN DAGScheduler: Broadcasting large task binary with size 1049.1 KiB
                                                                                

7357


22/04/15 21:05:16 WARN DAGScheduler: Broadcasting large task binary with size 1089.8 KiB
                                                                                

Cal_Li


0it [00:00, ?it/s]

Deal_C_P
10075


  0%|          | 0/12455 [00:00<?, ?it/s]

  0%|          | 0/12455 [00:00<?, ?it/s]

cal_Density


  0%|          | 0/12455 [00:00<?, ?it/s]

22/04/15 21:11:36 WARN DAGScheduler: Broadcasting large task binary with size 1061.0 KiB
                                                                                

6670


22/04/15 21:12:23 WARN DAGScheduler: Broadcasting large task binary with size 1101.8 KiB
                                                                                

Cal_Li


0it [00:00, ?it/s]

Deal_C_P
9838


  0%|          | 0/12267 [00:00<?, ?it/s]

  0%|          | 0/12267 [00:00<?, ?it/s]

cal_Density


  0%|          | 0/12267 [00:00<?, ?it/s]

22/04/15 21:21:54 WARN DAGScheduler: Broadcasting large task binary with size 1050.9 KiB
22/04/15 21:22:41 WARN DAGScheduler: Broadcasting large task binary with size 1091.7 KiB


12021




37455
CPU times: user 25min 32s, sys: 29.6 s, total: 26min 2s
Wall time: 32min 56s


                                                                                