In [1]:
# Requirements
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=1068dd9c6b8b24b53613e75e5fba0d081befd81b27eb500447ae0db66319cc79
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

# French Bakery

## Preparing for the data

In [2]:
# Unziping file
!unzip '/content/french_bakery.zip'

Archive:  /content/french_bakery.zip
  inflating: Bakery sales.csv        


In [3]:
from pyspark import SparkContext
import re

sc = SparkContext.getOrCreate()

Columns' Explanation

- `ticket number`: identifier for every single transaction
- `article`: name of the product sold (in French)
- `quantity`: quantity sold
- `unit_price`: price per product
- `Objective`: Forecast the sales in order to ease the production planning

## Transform

In [4]:
# Get the rdd
lines = sc.textFile("Bakery sales.csv")
rdd = lines.map(lambda x: x.split(','))

# Remove header
header = rdd.first()
rdd = rdd.filter(lambda x: x!= header)

rdd.take(1)

[['0', '2021-01-02', '08:38', '150040.0', 'BAGUETTE', '1.0', '"0', '90 €"']]

In [5]:
# Some rows contain more element
rdd.map(lambda x: (len(x), 1)).reduceByKey(lambda x, y: x+y).collect()

[(8, 233984), (9, 21)]

In [6]:
# Some rows contain comma in their product's name,
#   that is: "PLATPREPARE6,50" and "PLATPREPARE7,00"
rdd.filter(lambda x: (len(x)==9)).collect() 

[['137071',
  '2021-07-12',
  '08:25',
  '187148.0',
  '"PLATPREPARE6',
  '50"',
  '1.0',
  '"0',
  '00 €"'],
 ['137604',
  '2021-07-12',
  '11:30',
  '187296.0',
  '"PLATPREPARE6',
  '50"',
  '2.0',
  '"6',
  '50 €"'],
 ['137605',
  '2021-07-12',
  '11:30',
  '187296.0',
  '"PLATPREPARE5',
  '50"',
  '1.0',
  '"5',
  '50 €"'],
 ['137698',
  '2021-07-12',
  '11:52',
  '187322.0',
  '"PLATPREPARE7',
  '00"',
  '3.0',
  '"7',
  '00 €"'],
 ['138743',
  '2021-07-13',
  '10:35',
  '187606.0',
  '"PLATPREPARE7',
  '00"',
  '4.0',
  '"7',
  '00 €"'],
 ['138901',
  '2021-07-13',
  '11:17',
  '187650.0',
  '"PLATPREPARE5',
  '50"',
  '1.0',
  '"5',
  '50 €"'],
 ['138903',
  '2021-07-13',
  '11:17',
  '187650.0',
  '"PLATPREPARE6',
  '00"',
  '1.0',
  '"6',
  '00 €"'],
 ['139134',
  '2021-07-13',
  '12:19',
  '187715.0',
  '"PLATPREPARE7',
  '00"',
  '1.0',
  '"7',
  '00 €"'],
 ['139559',
  '2021-07-13',
  '19:23',
  '187833.0',
  '"PLATPREPARE7',
  '00"',
  '1.0',
  '"7',
  '00 €"'],
 ['139560'

In [7]:
# Columns transformation
def columns_transfrom(row):
    id = row[3][:-2]
    date = row[1]
    time = row[2]
    product = row[4]
    
    quantity =  row[-3]
    unit_price = f"{row[-2]}.{row[-1]}"

    if len(row) == 9:
        product = f"{row[4]},{row[5]}"
    
    return (id, date, time, product, quantity, unit_price)

rdd = rdd.map(columns_transfrom)
rdd.take(3)

[('150040', '2021-01-02', '08:38', 'BAGUETTE', '1.0', '"0.90 €"'),
 ('150040', '2021-01-02', '08:38', 'PAIN AU CHOCOLAT', '3.0', '"1.20 €"'),
 ('150041', '2021-01-02', '09:14', 'PAIN AU CHOCOLAT', '2.0', '"1.20 €"')]

In [8]:
# Second column transformation
def columns_transfrom_2(row):

    id, date, time, product, quantity, unit_price = row
    
    year, month, date = date.split('-')
    year, month, date = int(year), int(month), int(date)

    hour, minute = time.split(':')
    hour, minute = int(hour), int(minute)

    quantity = re.search('\d(\.|")', quantity).group()[:-1]
    quantity = int(quantity)
    
    unit_price = re.search(r"(\d| )\.\d+", unit_price).group()
    unit_price = float(unit_price)
    
    return (id, year, month, date, hour, minute, product, quantity, unit_price)

rdd = rdd.map(columns_transfrom_2)
rdd.take(3)

[('150040', 2021, 1, 2, 8, 38, 'BAGUETTE', 1, 0.9),
 ('150040', 2021, 1, 2, 8, 38, 'PAIN AU CHOCOLAT', 3, 1.2),
 ('150041', 2021, 1, 2, 9, 14, 'PAIN AU CHOCOLAT', 2, 1.2)]

## Data analysis

In [9]:
# Most sold product
sorted(
    rdd.map(lambda x: (x[-3], x[-2]))\
    .reduceByKey(lambda x, y: x+y).collect(),

    key=lambda x: x[1], reverse=True
)

[('TRADITIONAL BAGUETTE', 116873),
 ('CROISSANT', 28500),
 ('PAIN AU CHOCOLAT', 24548),
 ('COUPE', 23487),
 ('BANETTE', 22898),
 ('BAGUETTE', 21739),
 ('CEREAL BAGUETTE', 7509),
 ('SPECIAL BREAD', 5520),
 ('FORMULE SANDWICH', 5379),
 ('TARTELETTE', 5032),
 ('BOULE 400G', 4778),
 ('CAMPAGNE', 4326),
 ('COOKIE', 3763),
 ('ECLAIR', 3654),
 ('VIK BREAD', 3633),
 ('COMPLET', 3563),
 ('FICELLE', 3449),
 ('MOISSON', 3394),
 ('BANETTINE', 3120),
 ('BOULE 200G', 3098),
 ('PAIN BANETTE', 3043),
 ('SANDWICH COMPLET', 2914),
 ('PAIN AUX RAISINS', 2753),
 ('PAIN', 2409),
 ('CROISSANT AMANDES', 2369),
 ('KOUIGN AMANN', 2367),
 ('QUIM BREAD', 2153),
 ('CAFE OU EAU', 1966),
 ('CHAUSSON AUX POMMES', 1946),
 ('PAIN CHOCO AMANDES', 1919),
 ('BOISSON 33CL', 1904),
 ('BAGUETTE GRAINE', 1880),
 ('SAND JB EMMENTAL', 1832),
 ('BRIOCHE', 1720),
 ('GRAND FAR BRETON', 1506),
 ('TRAITEUR', 1476),
 ('SEIGLE', 1418),
 ('PARIS BREST', 1294),
 ('FINANCIER X5', 1275),
 ('DEMI BAGUETTE', 1147),
 ('FLAN', 1097),
 ('MILL

In [10]:
# Transactions with the highest income
result = rdd.map(lambda x: (x[0], x[-1]*x[-2]))\
            .reduceByKey(lambda x, y: x+y)\
            .collect()
        
       
result = sorted(result, key=lambda x: x[1], reverse=True)[:10]
print('Transaction Id \t Income')
for item in result:
    print(f"{item[0]} \t\t {item[1]} €")

Transaction Id 	 Income
194199 		 79.7 €
256455 		 64.2 €
220468 		 61.2 €
197217 		 58.6 €
198481 		 57.2 €
198573 		 54.2 €
273733 		 52.800000000000004 €
272070 		 49.9 €
229276 		 49.0 €
270964 		 48.9 €


In [11]:
# Total income per year
result = rdd.map(lambda x: (x[1], x[-1]*x[-2]))\
            .reduceByKey(lambda x, y: x+y)\
            .collect()

print('Year \t Income')
for item in result:
    print(f"{item[0]} \t {item[1]:.2f} €")

Year 	 Income
2022 	 246857.83 €
2021 	 294123.72 €


In [12]:
# Total income per month
result = rdd.map(lambda x: (x[2], x[-1]*x[-2]))\
            .reduceByKey(lambda x, y: x+y)\
            .mapValues(lambda x: x/2)\
            .collect()

print('Month \t Avg Income')
for item in sorted(result, key=lambda x: x[1], reverse=True):
    print(f"{item[0]} \t {item[1]:.2f} €")

Month 	 Avg Income
8 	 49552.65 €
7 	 42699.34 €
5 	 28485.38 €
4 	 24284.75 €
6 	 24260.28 €
9 	 21611.80 €
3 	 19706.01 €
2 	 16978.22 €
1 	 14352.58 €
10 	 10938.83 €
12 	 8875.32 €
11 	 8745.62 €


In [13]:
# What time do transactions ted to happen?
#   morning       3.00 - 9.59
#   noon          10.00 - 17.59
#   night         18.00 - 2.59

def when_to_buy(row):
    when = 'night'
    if (row[4] >= 3) and (row[4] <= 9):
        when = 'morning'
    elif (row[4] >= 10) and (row[4] <= 17):
        when = 'noon'
    
    return (row[0], when)

result = rdd.map(when_to_buy)\
            .reduceByKey(lambda x, y: x)\
            .map(lambda x: (x[1], 1))\
            .reduceByKey(lambda x, y: x+y)\
            .collect()

for item in result:
    print(f"{item[0]}    \t{item[1]} transcations")

noon    	93182 transcations
night    	7090 transcations
morning    	36179 transcations


# Instagram App Store Reviews

## Preparing for the data

In [14]:
from pyspark import SparkContext
import re

sc = SparkContext.getOrCreate()

In [15]:
# Get the rdd 
rdd = sc.textFile("Instagram.csv") 

rdd.take(3)

['reviewId,content,score',
 '83c7935d-d791-4d9c-b738-c7984992491a,It is a best app,5',
 '2d56d1a3-a4d0-4482-ae45-8394d2b8951f,"I love Insta, my No1 social media app❤️",5']

## Transform

In [16]:
# Remove header
header = rdd.first()
rdd = rdd.filter(lambda x: x!= header)

# Get the information about rating and review
def parser(line):
    row = line.split(',')
    rating, review = row[-1], row[1]
    for i in range(2, len(row)-1):
        review += row[i]
    
    return (rating, review)

rdd = rdd.map(parser)

rdd.take(2)

[('5', 'It is a best app'), ('5', '"I love Insta my No1 social media app❤️"')]

In [17]:
# Drop rows with missing values in ratings
missing_ratings = rdd.filter(lambda x: re.search('^\D', x[0])).collect()
rdd = rdd.filter(lambda x: x not in missing_ratings)

missing_ratings

[('"Cool app... Watch only what u follow... Free from politics and ads... ',
  '"Cool app... Watch only what u follow... Free from politics and ads... ')]

## Data analysis

In [18]:
# Distribution of ratings
result = rdd.mapValues(lambda x: 1)\
            .reduceByKey(lambda x, y: x+y)\
            .collect()

for item in result:
    print(f"{int(item[0])}   {item[1]}")

1   1604
4   836
5   6643
3   526
2   391


### Most common words in "Good" ratings review

In [19]:
# Good ratings RDD
goodRDD = rdd.filter(lambda x: x[0] in ['4', '5'])
goodRDD.take(5)

[('5', 'It is a best app'),
 ('5', '"I love Insta my No1 social media app❤️"'),
 ('5', '"THIS IS A VERY GOOD APP 👌🙂"'),
 ('5', 'Nice app'),
 ('5', 'Good reel and good dance')]

In [20]:
# Flatmap to normalize words
goodRDD = goodRDD.flatMap(lambda x: re.compile(r'\W+').split(x[1].lower()))

# Results
sorted(
    goodRDD.map(lambda x: (x, 1))\
        .reduceByKey(lambda x, y: x + y)\
        .collect(),
       
    key=lambda x: x[1], reverse=True
)[:50]

[('', 2624),
 ('app', 1710),
 ('good', 1342),
 ('i', 1080),
 ('nice', 1068),
 ('very', 897),
 ('it', 768),
 ('is', 669),
 ('and', 623),
 ('instagram', 616),
 ('this', 613),
 ('to', 552),
 ('the', 535),
 ('best', 505),
 ('my', 471),
 ('love', 463),
 ('a', 388),
 ('s', 365),
 ('you', 318),
 ('for', 313),
 ('but', 270),
 ('so', 262),
 ('super', 256),
 ('like', 232),
 ('please', 215),
 ('of', 206),
 ('great', 203),
 ('amazing', 198),
 ('not', 197),
 ('me', 188),
 ('in', 184),
 ('aap', 158),
 ('t', 153),
 ('awesome', 146),
 ('can', 141),
 ('excellent', 137),
 ('have', 134),
 ('that', 134),
 ('with', 130),
 ('use', 119),
 ('on', 106),
 ('social', 103),
 ('hai', 102),
 ('insta', 99),
 ('your', 98),
 ('application', 95),
 ('are', 92),
 ('all', 92),
 ('really', 90),
 ('time', 89)]

### Most common words in "Bad" ratings review

In [21]:
# Bad ratings RDD
badRDD = rdd.filter(lambda x: x[0] in ['1', '2'])
badRDD.take(5)

[('1',
  "I would have given you 0 stars if i could. Fix your algorithm I'm being spammed by sex workers acounts with sketchy Links that Instagram don't see a problem in when I report it. I'm starting to hate this app and it's stupid algorithm. Just make the mobile version like the desktop version and stop this shitfest!"),
 ('1', 'Worst I want to uninstall this'),
 ('1', 'Reply problem please solved this problem'),
 ('1', "It's hanging"),
 ('1',
  '"Internet connectivity gets bugged way too often  re-downloading fixes it but not for long and you\'ll have to do the same again over and over. Ok so now re-downloading also doesn\'t help very nice 🙂"')]

In [22]:
# Flatmap to normalize words
badRDD = badRDD.flatMap(lambda x: re.compile(r'\W+').split(x[1].lower()))

# Results
sorted(
    badRDD.map(lambda x: (x, 1))\
        .reduceByKey(lambda x, y: x + y)\
        .collect(),
       
    key=lambda x: x[1], reverse=True
)[:50]

[('', 1045),
 ('i', 963),
 ('to', 634),
 ('the', 590),
 ('it', 564),
 ('my', 553),
 ('and', 544),
 ('not', 483),
 ('app', 460),
 ('is', 455),
 ('t', 423),
 ('instagram', 354),
 ('this', 327),
 ('a', 315),
 ('in', 264),
 ('of', 245),
 ('account', 220),
 ('can', 213),
 ('but', 210),
 ('for', 207),
 ('s', 195),
 ('on', 194),
 ('you', 191),
 ('please', 182),
 ('me', 179),
 ('are', 152),
 ('very', 150),
 ('that', 148),
 ('have', 143),
 ('good', 140),
 ('so', 137),
 ('problem', 130),
 ('don', 129),
 ('no', 124),
 ('many', 120),
 ('as', 117),
 ('with', 113),
 ('all', 109),
 ('update', 107),
 ('working', 106),
 ('when', 104),
 ('even', 101),
 ('bad', 101),
 ('option', 99),
 ('there', 95),
 ('am', 93),
 ('like', 89),
 ('post', 85),
 ('nice', 84),
 ('now', 83)]