In [18]:
from collections import namedtuple 
import shutil, time, re
from src.config.spark_manager import spark_session
from src.constants.training_pipeline import *
from src.components.data_validation import add_mean_indicator_col_per_user
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, IndexToString, MinMaxScaler, VectorAssembler
from pyspark.sql.functions import lit, col, DataFrame, min, max
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [2]:
def read_downloaded_data(paths:list):
        try:
            #logging.info("Entered read_downloaded_data method")
            for i, path in enumerate(paths):
                file_list = os.listdir(path)
                for j,file in enumerate(file_list):
                    file_path = os.path.join(path, file)
                    user = file.split(sep='.')[0]
                    user_type = re.split('/', path)[-1]
                    temp_df = spark_session.read.csv(file_path, header=True, inferSchema=True)
                    temp_df = temp_df.withColumn(USER_COLUMN_NAME, lit(f"{user}_{user_type}"))
                    if j == 0:
                        temp_df1 = temp_df
                    else:
                        temp_df1 = temp_df1.union(temp_df)
                temp_df1 = temp_df1.withColumn(TARGET_COLUMN_NAME, lit(f"{user_type}"))
                if i == 0:    
                    temp_df2 = temp_df1
                else:
                    temp_df2 = temp_df2.union(temp_df1)
            #logging.info(f"reading of CSV is done")
            return temp_df2     
        except Exception as e:
            #logging.error(e)
            print(e)

In [3]:
df = read_downloaded_data(['../user_downloaded_data/UBE', '../user_downloaded_data/UGE/'])

In [4]:
df.count()

134764

In [5]:
for column in INDICATOR_COLS:
    df = df.filter(col(column) < INDICATOR_THRESHOLD)
df = add_mean_indicator_col_per_user(df, USER_COLUMN_NAME, INDICATOR_COLS)
df = df.drop(*COLS_TO_BE_REMOVED)

                                                                                

In [6]:
[df.count(), len(df.columns)]

[126632, 19]

In [18]:
def prepare_train_test_data(data: DataFrame, train_percentage:float, 
                                categorical_cols: list)-> DataFrame:
        try:
            #logging.info("Entered prepare_train_test_data method")
            train, test = data.randomSplit([train_percentage, 1 - train_percentage], seed=43)
            #train, test = data.randomSplit([train_percentage, 1 - train_percentage])
            empty_rdd = spark_session.sparkContext.emptyRDD()
            temp_df_1 = spark_session.createDataFrame(empty_rdd, schema=train.schema)
            for column in categorical_cols:
                cat_train_df = train.select(col(column))
                cat_test_df = test.select(col(column))
                df_diff = cat_test_df.subtract(cat_train_df).collect()
                print(f"column {column} in test dataset has {len(df_diff)} values not present in train dataset")
                if len(df_diff) > 0:
                    for row in df_diff:
                        temp_df = test.where(col(column) == row[column]).dropDuplicates([column])
                        temp_df_1 = temp_df_1.union(temp_df)
            if temp_df_1.count() > 0:
                train = train.union(temp_df_1)
            #logging.info(f"train and test split done. train count is {train.count()}, test count is {test.count()}")
            return train, test
        except Exception as e:
            #logging.error(e)
            print(e)


In [2]:
df = spark_session.read.csv('./final_data2.csv', inferSchema=True, header=True)

                                                                                

In [11]:
df.select(*['day', 'user']).groupBy(['day', 'user']).count().show()



+----------+-----+-----+
|       day| user|count|
+----------+-----+-----+
|2021-06-16|152.0| 3803|
|2021-06-12|151.0|  844|
|2021-06-14|153.0| 1635|
|2021-06-15|152.0| 3485|
|2021-06-12|152.0| 3777|
|2021-06-14|151.0| 1037|
|2021-06-10|153.0|  220|
|2021-06-14|150.0|  402|
|2021-06-15|151.0| 1789|
|2021-06-12|154.0| 6236|
|2021-06-10|150.0|  165|
|2021-06-12|153.0| 1132|
|2021-06-12|150.0|  549|
|2021-06-13|153.0| 1443|
|2021-06-13|150.0|  573|
|2021-06-13|152.0| 3856|
|2021-06-14|154.0| 4100|
|2021-06-16|153.0| 1209|
|2021-06-10|151.0|  726|
|2021-06-10|154.0|  210|
+----------+-----+-----+
only showing top 20 rows



                                                                                

In [28]:
df.select(*['day', 'result']).groupBy(['day', 'result']).count().show()



+----------+------+------+
|       day|result| count|
+----------+------+------+
|2021-06-11|   uge|261331|
|2021-06-12|   uge|294052|
|2021-06-16|   uge|270988|
|2021-06-13|   uge|303593|
|2021-06-15|   uge|265336|
|2021-06-10|   uge|117804|
|2021-06-14|   uge|311565|
|2021-06-12|   ube|294773|
|2021-06-14|   ube|314681|
|2021-06-11|   ube|277704|
|2021-06-16|   ube|274946|
|2021-06-15|   ube|270898|
|2021-06-10|   ube|136994|
|2021-06-13|   ube|292707|
+----------+------+------+



                                                                                

In [5]:
df.select(min('day'), max('day')).show()



+----------+----------+
|  min(day)|  max(day)|
+----------+----------+
|2021-06-10|2021-06-16|
+----------+----------+



                                                                                

In [14]:
df_1 = spark_session.read.csv('../user_downloaded_data/UBE/user1.csv', inferSchema=True, header=True)
df_1 = df_1.withColumn('day', )

In [23]:
vect = VectorAssembler(inputCols=['hour'], outputCol='v_hour')
df_2 = vect.transform(df_1)
min_max_scalar = MinMaxScaler(inputCol='v_hour', outputCol='s_hour')
df_2 = min_max_scalar.fit(df_2).transform(df_2)

                                                                                

In [24]:
df_2.show()

+----------+----+-------------------+----------+----------+----------+----------+----------+----------+----------+----------+------+--------------------+
|       day|hour|       specifictime|indicator1|indicator2|indicator3|indicator4|indicator5|indicator6|indicator7|indicator8|v_hour|              s_hour|
+----------+----+-------------------+----------+----------+----------+----------+----------+----------+----------+----------+------+--------------------+
|2021-06-10|  18|2021-06-10 18:09:59|      45.0|      36.0|       3.0|      45.0|     229.0|      45.0|       0.2|       0.0|[18.0]|[0.7826086956521738]|
|2021-06-10|  18|2021-06-10 18:09:59|      33.0|       3.0|       0.0|      33.0|       2.0|      33.0|       0.0|       0.0|[18.0]|[0.7826086956521738]|
|2021-06-10|  18|2021-06-10 18:09:59|      24.0|       2.0|       5.0|      24.0|       2.0|      24.0|       0.0|       0.0|[18.0]|[0.7826086956521738]|
|2021-06-10|  18|2021-06-10 18:09:59|       5.0|       1.0|       2.0|      

In [26]:
df.schema

StructType(List(StructField(_c0,IntegerType,true),StructField(day,StringType,true),StructField(hour,IntegerType,true),StructField(specifictime,StringType,true),StructField(indicator1,DoubleType,true),StructField(indicator2,DoubleType,true),StructField(indicator3,DoubleType,true),StructField(indicator4,DoubleType,true),StructField(indicator5,DoubleType,true),StructField(indicator6,DoubleType,true),StructField(indicator7,DoubleType,true),StructField(indicator8,DoubleType,true),StructField(user,DoubleType,true),StructField(result,StringType,true)))