# Group Assignment - Group 3

#### The aim of this project is to compare the mathematics class with the portuguese class in order to understand how several factors affect the students grades.

### We first import what we need to work with the data and create the Spark session

In [1]:
import findspark
findspark.init()

import pandas as pd
pd.set_option('display.max_colwidth', None)

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = ' pyspark-shell'

from pyspark.sql.session import SparkSession

import pyspark.sql.functions as F

spark = (SparkSession.builder
.appName("Group 3 Assignment")
.config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
.getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### RAW to STD

#### We modify the schema to adapt it to our work needs, after we save the files as .parquet in the std folder of our data lake

In [2]:
articles_raw = (spark.read.option("inferSchema", "true")
                    .option("header", "true")
                    .csv("hdfs://localhost:9000/group_assignment/datalake/raw/articles"))

                                                                                

In [3]:
customers_raw = (spark.read.option("inferSchema", "true")
                    .option("header", "true")
                    .csv("hdfs://localhost:9000/group_assignment/datalake/raw/customers"))

                                                                                

In [4]:
transactions_raw = (spark.read.option("inferSchema", "true")
                    .option("header", "true")
                    .csv("hdfs://localhost:9000/group_assignment/datalake/raw/transactions"))

                                                                                

In [5]:
articles_raw.printSchema()

root
 |-- article_id: integer (nullable = true)
 |-- product_code: integer (nullable = true)
 |-- prod_name: string (nullable = true)
 |-- product_type_no: integer (nullable = true)
 |-- product_type_name: string (nullable = true)
 |-- product_group_name: string (nullable = true)
 |-- graphical_appearance_no: integer (nullable = true)
 |-- graphical_appearance_name: string (nullable = true)
 |-- colour_group_code: integer (nullable = true)
 |-- colour_group_name: string (nullable = true)
 |-- perceived_colour_value_id: integer (nullable = true)
 |-- perceived_colour_value_name: string (nullable = true)
 |-- perceived_colour_master_id: integer (nullable = true)
 |-- perceived_colour_master_name: string (nullable = true)
 |-- department_no: integer (nullable = true)
 |-- department_name: string (nullable = true)
 |-- index_code: string (nullable = true)
 |-- index_name: string (nullable = true)
 |-- index_group_no: integer (nullable = true)
 |-- index_group_name: string (nullable = true)

In [6]:
articles_std = articles_raw.select(
    F.col("article_id").cast("string"),
    F.col("product_code").cast("string"),
    F.col("prod_name"),
    F.col("product_type_no").cast("string"),
    F.col("product_type_name"),
    F.col("product_group_name"),
    F.col("graphical_appearance_no").cast("string"),
    F.col("graphical_appearance_name"),
    F.col("colour_group_code").cast("string"),
    F.col("colour_group_name"),
    F.col("perceived_colour_value_id").cast("string"),
    F.col("perceived_colour_value_name"),
    F.col("perceived_colour_master_id").cast("string"),
    F.col("perceived_colour_master_name"),
    F.col("department_no").cast("string"),
    F.col("department_name"),
    F.col("index_code").cast("string"),
    F.col("index_name"),
    F.col("index_group_no").cast("string"),
    F.col("index_group_name"),
    F.col("section_no").cast("string"),
    F.col("section_name"),
    F.col("garment_group_no").cast("string"),
    F.col("garment_group_name"),
    F.col("detail_desc"),
)

In [7]:
articles_std.write.mode("overwrite").parquet("hdfs://localhost:9000/group_assignment/datalake/std/articles")

                                                                                

In [8]:
customers_raw.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- FN: double (nullable = true)
 |-- Active: double (nullable = true)
 |-- club_member_status: string (nullable = true)
 |-- fashion_news_frequency: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- postal_code: string (nullable = true)



In [9]:
customers_std = customers_raw.select(
    F.col("customer_id"),
    F.col("FN").cast("string"),
    F.col("active").cast("string"),
    F.col("club_member_status"),
    F.col("fashion_news_frequency"),
    F.col("age"),
    F.col("postal_code").cast("string"),
)

In [10]:
customers_std.write.mode("overwrite").parquet("hdfs://localhost:9000/group_assignment/datalake/std/customers")

                                                                                

In [11]:
transactions_raw.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- sales_channel_id: integer (nullable = true)



In [12]:
transactions_std = transactions_raw.select(
    F.col("t_dat"),
    F.col("customer_id"),
    F.col("article_id").cast("string"),
    F.col("price"),
    F.col("sales_channel_id").cast("string"),
)

In [13]:
transactions_std = transactions_std.withColumn("year", F.substring("t_dat", 0, 4))
transactions_std = transactions_std.withColumn("month", F.substring("t_dat", 6, 2))
transactions_std = transactions_std.withColumn("day", F.substring("t_dat", 9, 2))

In [14]:
transactions_std.write.mode("overwrite").parquet("hdfs://localhost:9000/group_assignment/datalake/std/transactions")

                                                                                

### Working with our standardised datasets

#### We load our files from our datalake/std folder and begin to operate on the datasets

In [15]:
articles_std = spark.read.parquet("hdfs://localhost:9000/group_assignment/datalake/std/articles")

In [16]:
customers_std = spark.read.parquet("hdfs://localhost:9000/group_assignment/datalake/std/customers")

In [17]:
transactions_std = spark.read.parquet("hdfs://localhost:9000/group_assignment/datalake/std/transactions")

#  Analysing Customers Profiles

In [18]:
(customers_std.groupBy("club_member_status")
        .agg(
            F.count("*").alias("count"))
        .sort(F.col("count").desc())
        ).toPandas()

                                                                                

Unnamed: 0,club_member_status,count
0,ACTIVE,1272491
1,PRE-CREATE,92960
2,,6062
3,LEFT CLUB,467


In [19]:
age_range = F.udf(lambda age: '< 20' if age < 20 else 
                       '20-25' if (age >= 20 and age < 25) else
                       '25-30' if (age >= 25 and age < 30) else
                       '30-35' if (age >= 30 and age < 35) else
                       '35-40' if (age >= 35 and age < 40) else
                       '40-45' if (age >= 40 and age < 45) else
                       '45-50' if (age >= 45 and age < 50) else
                       '50-55' if (age >= 50 and age < 55) else
                       '55-60' if (age >= 55 and age < 60) else
                       '60-65' if (age >= 60 and age < 65) else
                       '65-70' if (age >= 65 and age < 70) else
                        '75+'  if (age >= 70) else '')

customers_std=customers_std.na.fill(value=0,subset=["age"])
customers_std=customers_std.withColumn('age_range',age_range(customers_std.age))

In [20]:
# Age ranges most frecuents in the customers

(customers_std.groupBy("age_range")
        .agg(
            F.count("*").alias("count"))
        ).sort(F.col("count").desc()).toPandas()

                                                                                

Unnamed: 0,age_range,count
0,20-25,285586
1,25-30,242772
2,30-35,150520
3,50-55,135326
4,45-50,118625
5,55-60,90916
6,< 20,87444
7,40-45,85493
8,35-40,83548
9,60-65,48148


# Analysing Transactions

In [21]:
transactions_gold=transactions_std.withColumn("is_weekend",F.dayofweek("t_dat").isin([1,7]).cast("int"))

transactions_gold.groupBy("is_weekend").agg(F.count("*").alias("count")).sort(F.col("count").desc()).toPandas()

                                                                                

Unnamed: 0,is_weekend,count
0,0,22705232
1,1,9083092


In [23]:
#Analising sale via web

transactions_gold.where("sales_channel_id==2").groupBy("is_weekend").agg(F.count("*").alias("count")).sort(F.col("count").desc()).toPandas()

                                                                                

Unnamed: 0,is_weekend,count
0,0,16157469
1,1,6222393


In [24]:
#Analising sale via offline
transactions_gold.where("sales_channel_id==1").groupBy("is_weekend").agg(F.count("*").alias("count")).sort(F.col("count").desc()).toPandas()

                                                                                

Unnamed: 0,is_weekend,count
0,0,6547763
1,1,2860699


# Analysing Articles

In [31]:
# We add colour coding to make a more visual appealing experience for the reader

articles_gold=articles_std.withColumn("colour_symbol",
                       F.when((articles_std.colour_group_name).like("%Black%"),F.lit("🖤"))\
                      .when((articles_std.colour_group_name).like("%White%"),F.lit("🤍"))\
                      .when((articles_std.colour_group_name).like("%Blue%"),F.lit("💙"))\
                      .when((articles_std.colour_group_name).like("%Lilac Purple%"),F.lit("💜"))\
                      .when((articles_std.colour_group_name).like("%Red%"),F.lit("♥"))\
                      .when((articles_std.colour_group_name).like("%Orange%"),F.lit("🧡"))\
                      .when((articles_std.colour_group_name).like("%Green%"),F.lit("💚"))\
                      .when((articles_std.colour_group_name).like("%Yellow%"),F.lit("💛"))\
                      .when((articles_std.colour_group_name).like("%Pink%"),F.lit("💖"))\
                      .when((articles_std.colour_group_name).like("%Brown%"),F.lit("🤎"))\
                                      .otherwise(F.lit(None)))

In [32]:
# Filter showing products and their dependent index name

columns=['index_group_name','product_group_name']
articles_gold.groupBy(columns).agg(F.count("*").alias("count")).sort(F.col("count").desc()).toPandas()

Unnamed: 0,index_group_name,product_group_name,count
0,Ladieswear,Garment Upper body,14110
1,Baby/Children,Garment Upper body,13178
2,Baby/Children,Garment Lower body,7371
3,Divided,Garment Upper body,6999
4,Menswear,Garment Upper body,6855
...,...,...,...
56,Sport,Shoes,2
57,Baby/Children,Bags,2
58,Baby/Children,Fun,2
59,Menswear,Items,1


In [33]:
articles_gold.write.mode("overwrite").parquet("hdfs://localhost:9000/group_assignment/datalake/gold/articles")

                                                                                

# Analysing Main Table

In [36]:
# Join the three tables

transactions_total = (transactions_gold.alias("t")
                                   .join(customers_std.alias("c"),F.col("t.customer_id")==F.col("c.customer_id"),"inner")
                                   .join(articles_gold.alias("a"),F.col("t.article_id")==F.col("a.article_id"),"inner")
                                   .select("t.customer_id","a.prod_name",
                                           "c.club_member_status",
                                           "c.age_range",
                                           "a.product_group_name",
                                           "c.Active",
                                           "t.year",
                                           "t.month",
                                           "a.index_group_name",
                                           "a.Colour_symbol",
                                           "a.prod_name").cache())

In [37]:
#frequent products buyed by clients in range 25-30

columns=[ "age_range","product_group_name",]
transactions_total.where("age_range='25-30'").groupBy(columns).agg(F.count("*").alias("count")).sort(F.col("count").desc()).toPandas()

                                                                                

Unnamed: 0,age_range,product_group_name,count
0,25-30,Garment Upper body,2766163
1,25-30,Garment Lower body,1530253
2,25-30,Garment Full body,823102
3,25-30,Swimwear,664194
4,25-30,Underwear,623378
5,25-30,Accessories,374208
6,25-30,Shoes,182971
7,25-30,Socks & Tights,158792
8,25-30,Nightwear,75996
9,25-30,Unknown,22597


In [38]:
#frequent products buyed by clients in range 20-25

columns=[ "age_range","product_group_name",]
transactions_total.where("age_range='20-25'").groupBy(columns).agg(F.count("*").alias("count")).sort(F.col("count").desc()).toPandas()

                                                                                

Unnamed: 0,age_range,product_group_name,count
0,20-25,Garment Upper body,2223410
1,20-25,Garment Lower body,1206357
2,20-25,Swimwear,616400
3,20-25,Garment Full body,577960
4,20-25,Underwear,554906
5,20-25,Accessories,348098
6,20-25,Shoes,117515
7,20-25,Socks & Tights,116265
8,20-25,Nightwear,52028
9,20-25,Unknown,17078


In [39]:
#age_range people that buy in web

(transactions_total.where("sales_channel_id==2").groupBy("age_range")
        .agg(
            F.count("*").alias("count"))
        .sort(F.col("count").desc())
        ).limit(10).toPandas()

                                                                                

Unnamed: 0,age_range,count
0,25-30,5345517
1,20-25,3934958
2,30-35,3310399
3,50-55,2138298
4,45-50,1919596
5,35-40,1559073
6,40-45,1408583
7,55-60,1314988
8,60-65,570467
9,< 20,514385


In [45]:
#colour prefered by age range in the product product_group_name

columns=["age_range","colour_symbol"]
(transactions_total.where("sales_channel_id==2 and product_group_name=='Garment Upper body'").groupBy(columns)
        .agg(
            F.count("*").alias("count"))
        .sort(F.col("count").desc())
        ).limit(10).toPandas()

                                                                                

Unnamed: 0,age_range,colour_symbol,count




In [41]:
#frequent customers 

frec_customer = transactions_total.groupBy("customer_id").count().where("count > 1000").orderBy(F.col("count").desc())
frec_customer.toPandas()

                                                                                

Unnamed: 0,customer_id,count
0,be1981ab818cf4ef6765b2ecaea7a2cbf14ccd6e8a7ee985513d9e8e53c6d91b,1895
1,b4db5e5259234574edfff958e170fe3a5e13b6f146752ca066abca3c156acc71,1441
2,49beaacac0c7801c2ce2d189efe525fe80b5d37e46ed05b50a4cd88e34d0748f,1364
3,a65f77281a528bf5c1e9f270141d601d116e1df33bf9df512f495ee06647a9cc,1361
4,cd04ec2726dd58a8c753e0d6423e57716fd9ebcf2f14ed6012e7e5bea016b4d6,1237
5,55d15396193dfd45836af3a6269a079efea339e875eff42cc0c228b002548a9d,1208
6,c140410d72a41ee5e2e3ba3d7f5a860f337f1b5e41c27cf9bda5517c8774f8fa,1170
7,8df45859ccd71ef1e48e2ee9d1c65d5728c31c46ae957d659fa4e5c3af6cc076,1169
8,03d0011487606c37c1b1ed147fc72f285a50c05f00b9712e0fc3da400c864296,1157
9,6cc121e5cc202d2bf344ffe795002bdbf87178054bcda2e57161f0ef810a4b55,1143


In [42]:
#months with more pruchases

time=["year","month"]

months = transactions_total.groupBy(time).count().orderBy(F.col("count").desc())
months.toPandas()

                                                                                

Unnamed: 0,year,month,count
0,2019,6,1906202
1,2019,7,1807494
2,2020,6,1764507
3,2019,5,1560319
4,2019,4,1476454
5,2018,10,1397040
6,2020,5,1361815
7,2020,7,1351502
8,2020,4,1340882
9,2019,3,1286750
