In [1]:
import findspark
# TO use init without passing a parameter, you have to define the env. variable SPARK_HOME 
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark import SparkContext
spark = SparkSession.builder.appName("Final-Project").getOrCreate()

In [3]:
import re
from pyspark.sql.functions import lit
df2 = spark.sparkContext.wholeTextFiles('input/')
df2 = df2.collect()
cities = {}
schema = StructType([
  StructField('Month', StringType(), True),
  StructField('Income', StringType(), True),
  StructField('city', StringType(), True),
  StructField('Num_of_shop', StringType(), True),
  StructField('Id_shop', StringType(), True)
  ])
final_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

for row in df2:
    city = re.search(r"(?<=input/).*?(?=.txt)", row[0]).group(0)
    id = re.search(r"(?<=input/).*?(?=.txt)", row[0]).group(0)
    if re.search(r".*?(?=_)", city):
        city = re.search(r".*?(?=_)", city).group(0)
    if city in cities.keys():
        cities[city] += 1
    else:
        cities[city] = 1
    tmp = spark.sparkContext.textFile(row[0])
    df = tmp.map(lambda k: k.split(" "))
    df = df.toDF()
    df = df.withColumnRenamed("_1", "Month").withColumnRenamed("_2", "Income")
    df = df.withColumn("city", lit(city))
    df = df.withColumn("Num_of_shop", lit(cities[city]))
    df = df.withColumn("Id_shop", lit(id))
    final_df = final_df.union(df)
final_df = final_df.withColumn("Income", final_df["Income"].cast(IntegerType()))   
final_df.printSchema()
final_df.show()

root
 |-- Month: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- Num_of_shop: string (nullable = true)
 |-- Id_shop: string (nullable = true)

+-----+------+-----+-----------+-------+
|Month|Income| city|Num_of_shop|Id_shop|
+-----+------+-----+-----------+-------+
|  JAN|    13|anger|          1|  anger|
|  FEB|    12|anger|          1|  anger|
|  MAR|    14|anger|          1|  anger|
|  APR|    15|anger|          1|  anger|
|  MAY|    12|anger|          1|  anger|
|  JUN|    15|anger|          1|  anger|
|  JUL|    19|anger|          1|  anger|
|  AUG|    15|anger|          1|  anger|
|  SEP|    13|anger|          1|  anger|
|  OCT|     8|anger|          1|  anger|
|  NOV|    14|anger|          1|  anger|
|  DEC|    16|anger|          1|  anger|
|  JAN|    13| lyon|          1|   lyon|
|  FEB|    12| lyon|          1|   lyon|
|  MAR|    14| lyon|          1|   lyon|
|  APR|    15| lyon|          1|   lyon|
|  MAY|    12| lyon| 

In [4]:
# Average monthly income of the shop in France

final_df.agg({"Income":"avg"}).show()

+-----------------+
|      avg(Income)|
+-----------------+
|23.19871794871795|
+-----------------+



In [5]:
# Average monthly income of the shop in each city

final_df.groupby('city').agg({"Income":"avg"}).orderBy('city').show()

+----------+------------------+
|      city|       avg(Income)|
+----------+------------------+
|     anger|13.833333333333334|
|      lyon|16.083333333333332|
|marseilles|21.458333333333332|
|    nantes|             17.25|
|      nice|16.916666666666668|
|    orlean|16.333333333333332|
|     paris| 43.55555555555556|
|    rennes|              15.0|
|  toulouse|             14.75|
|    troyes|17.833333333333332|
+----------+------------------+



In [6]:
# Total revenue per city per year

final_df.groupby('city').agg({"Income":"sum"}).orderBy("city").show()

+----------+-----------+
|      city|sum(Income)|
+----------+-----------+
|     anger|        166|
|      lyon|        193|
|marseilles|        515|
|    nantes|        207|
|      nice|        203|
|    orlean|        196|
|     paris|       1568|
|    rennes|        180|
|  toulouse|        177|
|    troyes|        214|
+----------+-----------+



In [7]:
# Total revenue per store per year

final_df.groupby('Id_shop').agg({"Income":"sum"}).orderBy("Id_shop").show()

+------------+-----------+
|     Id_shop|sum(Income)|
+------------+-----------+
|       anger|        166|
|        lyon|        193|
|marseilles_1|        284|
|marseilles_2|        231|
|      nantes|        207|
|        nice|        203|
|      orlean|        196|
|     paris_1|        596|
|     paris_2|        642|
|     paris_3|        330|
|      rennes|        180|
|    toulouse|        177|
|      troyes|        214|
+------------+-----------+



In [8]:
# The store that achieves the best performance in each month
df = final_df.groupby('Month').agg({"Income":"max"}).withColumnRenamed("max(Income)", "Max")

In [9]:
final = final_df.join(df, (final_df.Income == df.Max) & (final_df.Month == df.Month), how='leftsemi')
final.show()
final.select('Month', 'Income', 'Id_shop').show()

+-----+------+-----+-----------+-------+
|Month|Income| city|Num_of_shop|Id_shop|
+-----+------+-----+-----------+-------+
|  APR|    57|paris|          1|paris_1|
|  JAN|    51|paris|          1|paris_1|
|  OCT|    68|paris|          1|paris_1|
|  JUL|    61|paris|          1|paris_1|
|  DEC|    71|paris|          1|paris_1|
|  MAY|    72|paris|          2|paris_2|
|  SEP|    63|paris|          2|paris_2|
|  JUN|    85|paris|          2|paris_2|
|  NOV|    64|paris|          2|paris_2|
|  FEB|    42|paris|          2|paris_2|
|  MAR|    44|paris|          2|paris_2|
|  AUG|    45|paris|          2|paris_2|
+-----+------+-----+-----------+-------+

+-----+------+-------+
|Month|Income|Id_shop|
+-----+------+-------+
|  APR|    57|paris_1|
|  JAN|    51|paris_1|
|  OCT|    68|paris_1|
|  JUL|    61|paris_1|
|  DEC|    71|paris_1|
|  MAY|    72|paris_2|
|  SEP|    63|paris_2|
|  JUN|    85|paris_2|
|  NOV|    64|paris_2|
|  FEB|    42|paris_2|
|  MAR|    44|paris_2|
|  AUG|    45|paris_2