# DataFrame Creation

## Extract link from repository

Given the repository "input1" which contains all the files, we build a list that contains all the files names. These latter will be useful to build our dataframe since we need the name of each City and the ID of each store.

In [1]:
mypath = 'input1/'
from os import listdir
from os.path import isfile, join
files = [f for f in listdir(mypath) if isfile(join(mypath, f))]
link = [mypath+f for f in files]
link

['input1/paris_3.txt',
 'input1/paris_1.txt',
 'input1/nice.txt',
 'input1/lyon.txt',
 'input1/toulouse.txt',
 'input1/marseilles_1.txt',
 'input1/marseilles_2.txt',
 'input1/troyes.txt',
 'input1/rennes.txt',
 'input1/paris_2.txt',
 'input1/anger.txt',
 'input1/orlean.txt',
 'input1/nantes.txt']

## Extract City and ID from link

In this section, we made a function that takes a link to a file as input and return the correspending City and Store ID.
The store ID is the name of the city and the number of the store, if the city has only one store, we shall take City_1 as Store ID.

In [2]:
def extractCityID(link):
  # Function that compute the name of the city and its ID given the link 
  x = link.split('/')
  y = x[len(x)-1].split('.')[0] 
  z = y.split('_')
  if (len(z)>1):
    City = y.split('_')[0]
    ID = y.split('_')[1]
    return City, y
  else:
    City = y.split('_')[0]
    return City, y+'_1'

In [3]:
extractCityID(link[2])

('nice', 'nice_1')

## Create the data Frame 

To create the dataframe, we proceed by creating an RDD that contains the requested Rows and then, using an sqlContext, we create our dataframe passing the RDD as input.

To do so, we start by intializing four lists. The first one is filled by the city's and store id's using "extractCityID" function.
The second one accesses to the files to to take out the data. The third split the data into Month's and Revenue's. And the last one uses the latter three to make an RDD that contains Rows as asked in the lab.
We used also a temporary RDD to make union of all results after the for loop computation. 

In [4]:
from pyspark.sql import Row

cityId = []
lines = []
parts = []
dataF = []
cityId.append(extractCityID(link[0]))
lines.append(sc.textFile(link[0]))
parts.append(lines[0].map(lambda l: l.split(" ")))
dataF.append(parts[0].map(lambda p: Row(ID = cityId[0][1], City = cityId[0][0], Month = p[0], Revenue = int(p[1]))))
temp = parts[0].map(lambda p: Row(ID = cityId[0][1], City = cityId[0][0], Month = p[0], Revenue = int(p[1])))

dataF[0].take(10)
for i in range(1,len(link)):
  cityId.append(extractCityID(link[i]))
  lines.append(sc.textFile(link[i])) 
  parts.append(lines[i].map(lambda l: l.split(" ")))
  dataF.append(parts[i].map(lambda p: Row(ID = cityId[i][1], City = cityId[i][0], Month = p[0], Revenue = int(p[1]))))
  temp = temp.union(dataF[i])
  

In [5]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame(temp.map(lambda x: [x[1],x[0],x[2],x[3]]), 
                                schema=['store_ID', 'City', 'Month', 'Revenue'])    
df.show()

+--------+-----+-----+-------+
|store_ID| City|Month|Revenue|
+--------+-----+-----+-------+
| paris_3|paris|  JAN|     21|
| paris_3|paris|  FEB|     36|
| paris_3|paris|  MAR|     24|
| paris_3|paris|  APR|     24|
| paris_3|paris|  MAY|     23|
| paris_3|paris|  JUN|     25|
| paris_3|paris|  JUL|     21|
| paris_3|paris|  AUG|     38|
| paris_3|paris|  SEP|     38|
| paris_3|paris|  OCT|     38|
| paris_3|paris|  NOV|     21|
| paris_3|paris|  DEC|     21|
| paris_1|paris|  JAN|     51|
| paris_1|paris|  FEB|     21|
| paris_1|paris|  MAR|     11|
| paris_1|paris|  APR|     57|
| paris_1|paris|  MAY|     55|
| paris_1|paris|  JUN|     55|
| paris_1|paris|  JUL|     61|
| paris_1|paris|  AUG|     42|
+--------+-----+-----+-------+
only showing top 20 rows



# Average monthly income of the shop in France

The average monthly income of the shop in Frane is obtained by grouping on the "Month" by taking the averag of "Revenue".

In [24]:
amsf = (df
        .groupBy("Month")
        .avg("Revenue")
        .orderBy("avg(Revenue)"))
  
amsf.show()


+-----+------------------+
|Month|      avg(Revenue)|
+-----+------------------+
|  MAR| 17.53846153846154|
|  FEB|19.153846153846153|
|  APR| 20.23076923076923|
|  JAN| 20.76923076923077|
|  JUL|21.692307692307693|
|  MAY| 22.46153846153846|
|  AUG|23.076923076923077|
|  NOV| 24.53846153846154|
|  SEP| 25.53846153846154|
|  OCT| 26.53846153846154|
|  JUN|27.846153846153847|
|  DEC|              29.0|
+-----+------------------+



# Average monthly income of the shop in each city

Same request as the previous but we have to group by the "City" in addition to the "Month"

In [25]:
amsc = (df
        .groupBy("City", "Month")
        .mean())

amsc.orderBy("City").show()

+-----+-----+------------+
| City|Month|avg(Revenue)|
+-----+-----+------------+
|anger|  DEC|        16.0|
|anger|  MAY|        12.0|
|anger|  OCT|         8.0|
|anger|  NOV|        14.0|
|anger|  JUN|        15.0|
|anger|  MAR|        14.0|
|anger|  JUL|        19.0|
|anger|  FEB|        12.0|
|anger|  APR|        15.0|
|anger|  AUG|        15.0|
|anger|  JAN|        13.0|
|anger|  SEP|        13.0|
| lyon|  OCT|        11.0|
| lyon|  FEB|        12.0|
| lyon|  AUG|        25.0|
| lyon|  MAR|        14.0|
| lyon|  MAY|        12.0|
| lyon|  JAN|        13.0|
| lyon|  NOV|        22.0|
| lyon|  DEC|        22.0|
+-----+-----+------------+
only showing top 20 rows



# Total revenue per city per year

The total revenue per city per year is obtained by grouping over "City" and taking the sum of "Revenue"

In [14]:
trcy = (df
        .groupBy("City")
        .sum())

trcy.show()

+----------+------------+
|      City|sum(Revenue)|
+----------+------------+
|    nantes|         207|
|marseilles|         515|
|  toulouse|         177|
|     anger|         166|
|    orlean|         196|
|     paris|        1568|
|      lyon|         193|
|    rennes|         180|
|      nice|         203|
|    troyes|         214|
+----------+------------+



# Total revenue per store per year

Same as before but we have to group by the "store_ID" instead of grouping by the "City".

In [16]:
trsy = (df
        .groupBy("store_ID")
        .sum())

trsy.orderBy("sum(Revenue)").show()

+------------+------------+
|    store_ID|sum(Revenue)|
+------------+------------+
|     anger_1|         166|
|  toulouse_1|         177|
|    rennes_1|         180|
|      lyon_1|         193|
|    orlean_1|         196|
|      nice_1|         203|
|    nantes_1|         207|
|    troyes_1|         214|
|marseilles_2|         231|
|marseilles_1|         284|
|     paris_3|         330|
|     paris_1|         596|
|     paris_2|         642|
+------------+------------+



# The store that achieves the best performance in each month

Computing the store that achives the best performance in each month is not obvious as the latters. 
In fact, we have to proceed by doing some request before operating on the dataframe.

We firstly compute a new dataframe that contains only "Month" and the maximumof "Revenue"

In [19]:
df2 = (df
       .select("Month","Revenue")
       .groupBy("Month")
       .max("Revenue"))

df2.show()

+-----+------------+
|Month|max(Revenue)|
+-----+------------+
|  FEB|          42|
|  OCT|          68|
|  NOV|          64|
|  MAR|          44|
|  MAY|          72|
|  DEC|          71|
|  AUG|          45|
|  SEP|          63|
|  JAN|          51|
|  JUL|          61|
|  APR|          57|
|  JUN|          85|
+-----+------------+



Then we take inner join between our dataframe and the dataframe just created.

In [21]:
dfJoin = (df
          .join(df2, df.Month == df2.Month, 'inner')
          .drop(df.Month))

dfJoin.show()

+------------+----------+-------+-----+------------+
|    store_ID|      City|Revenue|Month|max(Revenue)|
+------------+----------+-------+-----+------------+
|     paris_3|     paris|     36|  FEB|          42|
|     paris_1|     paris|     21|  FEB|          42|
|      nice_1|      nice|     15|  FEB|          42|
|      lyon_1|      lyon|     12|  FEB|          42|
|  toulouse_1|  toulouse|     13|  FEB|          42|
|marseilles_1|marseilles|     21|  FEB|          42|
|marseilles_2|marseilles|     11|  FEB|          42|
|    troyes_1|    troyes|     21|  FEB|          42|
|    rennes_1|    rennes|     18|  FEB|          42|
|     paris_2|     paris|     42|  FEB|          42|
|     anger_1|     anger|     12|  FEB|          42|
|    orlean_1|    orlean|     12|  FEB|          42|
|    nantes_1|    nantes|     15|  FEB|          42|
|     paris_3|     paris|     38|  OCT|          68|
|     paris_1|     paris|     68|  OCT|          68|
|      nice_1|      nice|     18|  OCT|       

Now we have a dataframe that contains the previous dataframe with a newcolumn that contains the maximum of "Revenue".

Obviously, the result that we seek is just the "Month" and the "store_ID" when the "Revenue" is equal to the maximum of the "Revenue"

In [22]:
dfRes = (dfJoin
         .withColumnRenamed("max(Revenue)", "Maximum")
         .filter("Revenue = Maximum"))
dfRes.show()

+--------+-----+-------+-----+-------+
|store_ID| City|Revenue|Month|Maximum|
+--------+-----+-------+-----+-------+
| paris_2|paris|     72|  MAY|     72|
| paris_2|paris|     85|  JUN|     85|
| paris_1|paris|     71|  DEC|     71|
| paris_1|paris|     68|  OCT|     68|
| paris_2|paris|     42|  FEB|     42|
| paris_2|paris|     45|  AUG|     45|
| paris_2|paris|     64|  NOV|     64|
| paris_1|paris|     61|  JUL|     61|
| paris_1|paris|     51|  JAN|     51|
| paris_2|paris|     63|  SEP|     63|
| paris_2|paris|     44|  MAR|     44|
| paris_1|paris|     57|  APR|     57|
+--------+-----+-------+-----+-------+



To do so, we changed the name of the last column to make the request easier, and we filtered the dataframe over the condition "Revenue" = "Maximum"

And hence, the result is just the selection of the "Month" and the "store_ID"

In [23]:
sabp = (dfRes
        .select("Month", "store_ID"))

sabp.show()

+-----+--------+
|Month|store_ID|
+-----+--------+
|  MAY| paris_2|
|  JUN| paris_2|
|  DEC| paris_1|
|  OCT| paris_1|
|  FEB| paris_2|
|  AUG| paris_2|
|  NOV| paris_2|
|  JUL| paris_1|
|  JAN| paris_1|
|  SEP| paris_2|
|  MAR| paris_2|
|  APR| paris_1|
+-----+--------+

