In [6]:
# Import de Spark Session et SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf

conf = SparkConf()
conf.set("spark.log.level", "error")  # To display only errors
conf.set("spark.ui.showConsoleProgress", "false")  # To not display Spark job progress in Python

# Define a SparkContext
sc = SparkContext.getOrCreate(conf=conf)

# Define a SparkSession
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Introduction to DataFrame") \
    .getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Setting Spark log level to "ERROR".


In [7]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd = rdd.map(lambda x: x*2)
rdd.take(5)

[2, 4, 6, 8, 10]

In [8]:
sc.stop()

In [9]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("Spark Course").setMaster("local[*]")
sc = SparkContext(conf=conf)

Setting Spark log level to "ERROR".


In [10]:
rdd = sc.parallelize(range(10))

In [11]:
print(rdd)

PythonRDD[1] at RDD at PythonRDD.scala:53


In [12]:
print(rdd.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [13]:
print(rdd.getNumPartitions())

2


In [14]:
rdd = sc.parallelize(range(50), 10)
print(rdd.glom().collect())

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24], [25, 26, 27, 28, 29], [30, 31, 32, 33, 34], [35, 36, 37, 38, 39], [40, 41, 42, 43, 44], [45, 46, 47, 48, 49]]


In [15]:
def map(L, f):
    return([f(element) for element in L])

def f(integer):
    return(2*integer)

L = list(range(10))
print(map(L, f))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [16]:
matrix = ([[1,2],
           [3,4]])

flattened = [x for row in matrix for x in row]
print(flattened == [1, 2, 3, 4])

True


In [17]:
def f(integer):
    return([integer, 2*integer])

In [18]:
L = list(range(9))

def F(partition):
    for element in partition:
        if element%2 == 0:
            yield element

generateur = f(L)

for element in generateur:
    print(element)

[0, 1, 2, 3, 4, 5, 6, 7, 8]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 0, 1, 2, 3, 4, 5, 6, 7, 8]


In [19]:
def f(a,b):
    return(a+b)

In [20]:
def Reduce(f, L):
    while len(L) > 1:
        L[1] = f(L[0], L[1])
        L.pop(0)
    return(L)

L = list(range(10))
print(Reduce(lambda x,y: x+y, L))

[45]


In [21]:
def reduce(f, L):
    while len(L) > 1:
        L[1] = f(L[0], L[1])
        L.pop(0)
    return(L)

def reduceGlobal(f, L):
    for i, element in enumerate(L):
        reduce(f, element)
        L[i] = element[0]
    return(reduce(f, L))

L = [[0,1,2], [3,4,5], [6,7,8,9]]
print(reduceGlobal(lambda x,y: x+y, L))

[45]


In [22]:
count = 0
rdd = sc.parallelize([1,2,3,4,5])

def add_count(x):
    global count
    count += x

rdd.foreach(add_count)
print(count)

0


In [23]:
count = sc.accumulator(0)
rdd = sc.parallelize([1,2,3,4,5])

def add_count(x):
    count.add(x)

rdd.foreach(add_count)
print(count.value)

15


In [24]:
pi = sc.broadcast(3.14)
print(pi.value)

3.14


In [25]:
rdd = sc.parallelize(range(50))
print(rdd.map(lambda x: x**2).collect())

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401]


In [26]:
rdd = sc.parallelize(range(50))
print(rdd.map(lambda x: [x, x**2]).collect())

[[0, 0], [1, 1], [2, 4], [3, 9], [4, 16], [5, 25], [6, 36], [7, 49], [8, 64], [9, 81], [10, 100], [11, 121], [12, 144], [13, 169], [14, 196], [15, 225], [16, 256], [17, 289], [18, 324], [19, 361], [20, 400], [21, 441], [22, 484], [23, 529], [24, 576], [25, 625], [26, 676], [27, 729], [28, 784], [29, 841], [30, 900], [31, 961], [32, 1024], [33, 1089], [34, 1156], [35, 1225], [36, 1296], [37, 1369], [38, 1444], [39, 1521], [40, 1600], [41, 1681], [42, 1764], [43, 1849], [44, 1936], [45, 2025], [46, 2116], [47, 2209], [48, 2304], [49, 2401]]


In [27]:
rdd = sc.parallelize(range(50))

print(rdd.flatMap(lambda x: [x, x**2]).collect())



[0, 0, 1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100, 11, 121, 12, 144, 13, 169, 14, 196, 15, 225, 16, 256, 17, 289, 18, 324, 19, 361, 20, 400, 21, 441, 22, 484, 23, 529, 24, 576, 25, 625, 26, 676, 27, 729, 28, 784, 29, 841, 30, 900, 31, 961, 32, 1024, 33, 1089, 34, 1156, 35, 1225, 36, 1296, 37, 1369, 38, 1444, 39, 1521, 40, 1600, 41, 1681, 42, 1764, 43, 1849, 44, 1936, 45, 2025, 46, 2116, 47, 2209, 48, 2304, 49, 2401]


In [28]:
rdd = sc.parallelize(range(50), 3)




In [29]:
print(rdd.glom().collect())




[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], [16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32], [33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]]


In [30]:
def partition_sum(partition):

    return([sum(partition)])



In [31]:
print(rdd.mapPartitions(partition_sum).collect())




[120, 408, 697]


In [32]:
rdd = sc.parallelize(range(10), 3)




In [33]:
def add(x, y):
    return(x+y)

print(rdd.reduce(add))
print(rdd.reduce(lambda x, y: x+y))

45
45


In [34]:
rdd = sc.parallelize(range(50))

def odd_key(value):
    if value%2 == 0:
        return(0, value)
    else:
        return(1, value)

rdd_one = rdd.map(odd_key)
print(rdd_one.reduceByKey(lambda x, y: x+y).collect())

[(0, 600), (1, 625)]


In [35]:
rdd = sc.parallelize(range(50))

def odd_key(value):
    if value%2 == 0:
        return(0, value)
    else:
        return(1, value)

rdd_one = rdd.map(odd_key)
print(rdd_one.sortByKey().collect())

[(0, 0), (0, 2), (0, 4), (0, 6), (0, 8), (0, 10), (0, 12), (0, 14), (0, 16), (0, 18), (0, 20), (0, 22), (0, 24), (0, 26), (0, 28), (0, 30), (0, 32), (0, 34), (0, 36), (0, 38), (0, 40), (0, 42), (0, 44), (0, 46), (0, 48), (1, 1), (1, 3), (1, 5), (1, 7), (1, 9), (1, 11), (1, 13), (1, 15), (1, 17), (1, 19), (1, 21), (1, 23), (1, 25), (1, 27), (1, 29), (1, 31), (1, 33), (1, 35), (1, 37), (1, 39), (1, 41), (1, 43), (1, 45), (1, 47), (1, 49)]


In [36]:
rdd = sc.parallelize(range(50))


rdd_even = rdd.filter(lambda x: x%2 == 0)
rdd_odd = rdd.filter(lambda x: x%2 == 1)

print(rdd_even.count(), rdd_odd.count())
print(rdd_even.reduce(lambda x, y: x+y), rdd_odd.reduce(lambda x, y: x+y))



25 25
600 625


In [37]:
rdd = sc.parallelize(range(50))

rdd_one = rdd.map(lambda x: (x%2, x))




In [38]:
def partitioning(key):
    return key % 2


In [39]:
rdd_one = rdd_one.partitionBy(2, partitioning)

print(rdd_one.glom().collect())




[[(0, 0), (0, 2), (0, 4), (0, 6), (0, 8), (0, 10), (0, 12), (0, 14), (0, 16), (0, 18), (0, 20), (0, 22), (0, 24), (0, 26), (0, 28), (0, 30), (0, 32), (0, 34), (0, 36), (0, 38), (0, 40), (0, 42), (0, 44), (0, 46), (0, 48)], [(1, 1), (1, 3), (1, 5), (1, 7), (1, 9), (1, 11), (1, 13), (1, 15), (1, 17), (1, 19), (1, 21), (1, 23), (1, 25), (1, 27), (1, 29), (1, 31), (1, 33), (1, 35), (1, 37), (1, 39), (1, 41), (1, 43), (1, 45), (1, 47), (1, 49)]]


In [40]:
!wget https://assets-datascientest.s3.eu-west-1.amazonaws.com/shakespeare.txt

--2025-05-21 11:43:41--  https://assets-datascientest.s3.eu-west-1.amazonaws.com/shakespeare.txt
Resolving assets-datascientest.s3.eu-west-1.amazonaws.com (assets-datascientest.s3.eu-west-1.amazonaws.com)... 52.92.2.50, 52.218.56.104, 3.5.68.232, ...
Connecting to assets-datascientest.s3.eu-west-1.amazonaws.com (assets-datascientest.s3.eu-west-1.amazonaws.com)|52.92.2.50|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 293326 (286K) [text/plain]
Saving to: ‘shakespeare.txt.2’


2025-05-21 11:43:41 (25.4 MB/s) - ‘shakespeare.txt.2’ saved [293326/293326]



In [41]:
rdd = sc.textFile("shakespeare.txt")
rdd_clean = rdd.flatMap(lambda x: x.replace('\'', ' ').lower().split())

In [42]:
rdd_count = rdd_clean.map(lambda x: (x, 1))


In [43]:
print(rdd_count.reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], False).collect())







In [44]:
rdd_really_clean = rdd_clean.filter(lambda word: len(word) > 1)
rdd_count = rdd_really_clean.map(lambda x: (x, 1))
print(rdd_count.reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], False).collect())



In [45]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Spark Course").master("local[*]").getOrCreate()
sc = spark.sparkContext

In [46]:
L = [[0, "Alexandre", 28, "Chatou", "December"], 
     [1, "Damien", 24, "Courbevoie", "November"], 
     [2, "Dimitri", 25, "Sibiu", "October"], 
     [3, "Sacha", 32, "Poissy", "September"], 
     [4, "Victor", 21, "Montrouge", "June"]]

rdd = sc.parallelize(L)



In [47]:
df = rdd.toDF()
df.printSchema() 
df.show()

'''
The integers here are deduced to be of long type which is too heavy to store what we want here, interger type would be more suitable. 
We do not have our desired column names which is logical because we did not specify those at any moment.
'''

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)

+---+---------+---+----------+---------+
| _1|       _2| _3|        _4|       _5|
+---+---------+---+----------+---------+
|  0|Alexandre| 28|    Chatou| December|
|  1|   Damien| 24|Courbevoie| November|
|  2|  Dimitri| 25|     Sibiu|  October|
|  3|    Sacha| 32|    Poissy|September|
|  4|   Victor| 21| Montrouge|     June|
+---+---------+---+----------+---------+



'\nThe integers here are deduced to be of long type which is too heavy to store what we want here, interger type would be more suitable. \nWe do not have our desired column names which is logical because we did not specify those at any moment.\n'

In [48]:
from pyspark.sql.types import *


L = [[0, "Alexandre", 28, "Chatou", "December"], 
     [1, "Damien", 24, "Courbevoie", "November"], 
     [2, "Dimitri", 25, "Sibiu", "October"], 
     [3, "Sacha", 32, "Poissy", "September"], 
     [4, "Victor", 21, "Montrouge", "June"]]

rdd = sc.parallelize(L)
schema = StructType([
    StructField("", IntegerType(), False), 
    StructField("FirstName", StringType()), 
    StructField("Age", IntegerType()), 
    StructField("City", StringType()), 
    StructField("Cohort", StringType())
])

df = spark.createDataFrame(rdd, schema)
df.printSchema()
df.show()

root
 |-- : integer (nullable = false)
 |-- FirstName: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Cohort: string (nullable = true)

+---+---------+---+----------+---------+
|   |FirstName|Age|      City|   Cohort|
+---+---------+---+----------+---------+
|  0|Alexandre| 28|    Chatou| December|
|  1|   Damien| 24|Courbevoie| November|
|  2|  Dimitri| 25|     Sibiu|  October|
|  3|    Sacha| 32|    Poissy|September|
|  4|   Victor| 21| Montrouge|     June|
+---+---------+---+----------+---------+



In [49]:
df_csv = spark.read.option("header", True).option("inferSchema", True).csv("csv_comma.csv")
df_csv.printSchema()
df_csv.show()

root
 |-- Prénom: string (nullable = true)
 |-- Âge: integer (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Cohorte: string (nullable = true)

+---------+---+----------+---------+
|   Prénom|Âge|     Ville|  Cohorte|
+---------+---+----------+---------+
|Alexandre| 28|    Chatou| Décembre|
|   Damien| 24|Courbevoie| Novembre|
|  Dimitri| 25|     Sibiu|  Octobre|
|    Sacha| 32|    Poissy|Septembre|
|   Victor| 21| Montrouge|     Juin|
+---------+---+----------+---------+



In [50]:
df_json = spark.read.option("multiLine", True).json("json_comma.json")
df_json.show()
df_json.printSchema()

+---------+---------+----------+---+
|  Cohorte|   Prénom|     Ville|Âge|
+---------+---------+----------+---+
| Décembre|Alexandre|    Chatou| 28|
| Novembre|   Damien|Courbevoie| 24|
|  Octobre|  Dimitri|     Sibiu| 25|
|Septembre|    Sacha|    Poissy| 32|
|     Juin|   Victor| Montrouge| 21|
+---------+---------+----------+---+

root
 |-- Cohorte: string (nullable = true)
 |-- Prénom: string (nullable = true)
 |-- Ville: string (nullable = true)
 |-- Âge: long (nullable = true)



In [51]:
df_json_schema = spark.read.schema(schema).option("multiLine", True).json("json_comma.json")
df_json_schema.show()
df_json_schema.printSchema()

+----+---------+----+----+------+
|    |FirstName| Age|City|Cohort|
+----+---------+----+----+------+
|NULL|     NULL|NULL|NULL|  NULL|
|NULL|     NULL|NULL|NULL|  NULL|
|NULL|     NULL|NULL|NULL|  NULL|
|NULL|     NULL|NULL|NULL|  NULL|
|NULL|     NULL|NULL|NULL|  NULL|
+----+---------+----+----+------+

root
 |-- : integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Cohort: string (nullable = true)



In [52]:
df_gps = spark.read.option("header", True).option("interSchema", True).option("escape", "\"").csv("gps_app.csv")
df_gps.show()
df_gps.printSchema()


+--------------------+--------------+------+-------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+
|                 App|      Category|Rating|Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|      Last Updated|       Current Ver| Android Ver|
+--------------------+--------------+------+-------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|    159| 19M|    10,000+|Free|    0|      Everyone|        Art & Design|   January 7, 2018|             1.0.0|4.0.3 and up|
| Coloring book moana|ART_AND_DESIGN|   3.9|    967| 14M|   500,000+|Free|    0|      Everyone|Art & Design;Pret...|  January 15, 2018|             2.0.0|4.0.3 and up|
|U Launcher Lite –...|ART_AND_DESIGN|   4.7|  87510|8.7M| 5,000,000+|Free|    0|      Everyone|        Art & Design|    August 1, 2018|             1.2.4|4.0.3 

In [53]:
df_csv.show(5)
df_json.show(5)
df_gps.show(5)


+---------+---+----------+---------+
|   Prénom|Âge|     Ville|  Cohorte|
+---------+---+----------+---------+
|Alexandre| 28|    Chatou| Décembre|
|   Damien| 24|Courbevoie| Novembre|
|  Dimitri| 25|     Sibiu|  Octobre|
|    Sacha| 32|    Poissy|Septembre|
|   Victor| 21| Montrouge|     Juin|
+---------+---+----------+---------+

+---------+---------+----------+---+
|  Cohorte|   Prénom|     Ville|Âge|
+---------+---------+----------+---+
| Décembre|Alexandre|    Chatou| 28|
| Novembre|   Damien|Courbevoie| 24|
|  Octobre|  Dimitri|     Sibiu| 25|
|Septembre|    Sacha|    Poissy| 32|
|     Juin|   Victor| Montrouge| 21|
+---------+---------+----------+---+

+--------------------+--------------+------+-------+----+-----------+----+-----+--------------+--------------------+----------------+------------------+------------+
|                 App|      Category|Rating|Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|    Last Updated|       Current Ver| Android Ver|


In [54]:
print(df_csv.head(5))
print(df_csv.tail(5))
print(df_csv.first())


[Row(Prénom='Alexandre', Âge=28, Ville='Chatou', Cohorte='Décembre'), Row(Prénom='Damien', Âge=24, Ville='Courbevoie', Cohorte='Novembre'), Row(Prénom='Dimitri', Âge=25, Ville='Sibiu', Cohorte='Octobre'), Row(Prénom='Sacha', Âge=32, Ville='Poissy', Cohorte='Septembre'), Row(Prénom='Victor', Âge=21, Ville='Montrouge', Cohorte='Juin')]
[Row(Prénom='Alexandre', Âge=28, Ville='Chatou', Cohorte='Décembre'), Row(Prénom='Damien', Âge=24, Ville='Courbevoie', Cohorte='Novembre'), Row(Prénom='Dimitri', Âge=25, Ville='Sibiu', Cohorte='Octobre'), Row(Prénom='Sacha', Âge=32, Ville='Poissy', Cohorte='Septembre'), Row(Prénom='Victor', Âge=21, Ville='Montrouge', Cohorte='Juin')]
Row(Prénom='Alexandre', Âge=28, Ville='Chatou', Cohorte='Décembre')


In [55]:
print(df_json.head(5))
print(df_json.tail(5))
print(df_json.first())


[Row(Cohorte='Décembre', Prénom='Alexandre', Ville='Chatou', Âge=28), Row(Cohorte='Novembre', Prénom='Damien', Ville='Courbevoie', Âge=24), Row(Cohorte='Octobre', Prénom='Dimitri', Ville='Sibiu', Âge=25), Row(Cohorte='Septembre', Prénom='Sacha', Ville='Poissy', Âge=32), Row(Cohorte='Juin', Prénom='Victor', Ville='Montrouge', Âge=21)]
[Row(Cohorte='Décembre', Prénom='Alexandre', Ville='Chatou', Âge=28), Row(Cohorte='Novembre', Prénom='Damien', Ville='Courbevoie', Âge=24), Row(Cohorte='Octobre', Prénom='Dimitri', Ville='Sibiu', Âge=25), Row(Cohorte='Septembre', Prénom='Sacha', Ville='Poissy', Âge=32), Row(Cohorte='Juin', Prénom='Victor', Ville='Montrouge', Âge=21)]
Row(Cohorte='Décembre', Prénom='Alexandre', Ville='Chatou', Âge=28)


In [56]:
print(df_gps.head(5))
print(df_gps.tail(5))
print(df_gps.first())


[Row(App='Photo Editor & Candy Camera & Grid & ScrapBook', Category='ART_AND_DESIGN', Rating='4.1', Reviews='159', Size='19M', Installs='10,000+', Type='Free', Price='0', Content Rating='Everyone', Genres='Art & Design', Last Updated='January 7, 2018', Current Ver='1.0.0', Android Ver='4.0.3 and up'), Row(App='Coloring book moana', Category='ART_AND_DESIGN', Rating='3.9', Reviews='967', Size='14M', Installs='500,000+', Type='Free', Price='0', Content Rating='Everyone', Genres='Art & Design;Pretend Play', Last Updated='January 15, 2018', Current Ver='2.0.0', Android Ver='4.0.3 and up'), Row(App='U Launcher Lite – FREE Live Cool Themes, Hide Apps', Category='ART_AND_DESIGN', Rating='4.7', Reviews='87510', Size='8.7M', Installs='5,000,000+', Type='Free', Price='0', Content Rating='Everyone', Genres='Art & Design', Last Updated='August 1, 2018', Current Ver='1.2.4', Android Ver='4.0.3 and up'), Row(App='Sketch - Draw & Paint', Category='ART_AND_DESIGN', Rating='4.5', Reviews='215644', Size

In [57]:
length = 1000
fraction = length/df.count()
print(fraction)

200.0


In [58]:
df_subset = df_csv.sample(True, 0.1)
df_subset.distinct().show()

+------+---+-----+-------+
|Prénom|Âge|Ville|Cohorte|
+------+---+-----+-------+
+------+---+-----+-------+



In [59]:
df_gps.describe(["Reviews"]).show()


+-------+------------------+
|summary|           Reviews|
+-------+------------------+
|  count|             10841|
|   mean|444152.89603321033|
| stddev|2927760.6038856646|
|    min|                 0|
|    max|              9992|
+-------+------------------+



In [60]:
from pyspark.sql.functions import *


df_gps.select("App").show()
df_gps.select(df_gps["App"]).show()
df_gps.select(col("App")).show()
df_gps.select(4 * col("Rating")).show()

+--------------------+
|                 App|
+--------------------+
|Photo Editor & Ca...|
| Coloring book moana|
|U Launcher Lite –...|
|Sketch - Draw & P...|
|Pixel Draw - Numb...|
|Paper flowers ins...|
|Smoke Effect Phot...|
|    Infinite Painter|
|Garden Coloring Book|
|Kids Paint Free -...|
|Text on Photo - F...|
|Name Art Photo Ed...|
|Tattoo Name On My...|
|Mandala Coloring ...|
|3D Color Pixel by...|
|Learn To Draw Kaw...|
|Photo Designer - ...|
|350 Diy Room Deco...|
|FlipaClip - Carto...|
|        ibis Paint X|
+--------------------+
only showing top 20 rows

+--------------------+
|                 App|
+--------------------+
|Photo Editor & Ca...|
| Coloring book moana|
|U Launcher Lite –...|
|Sketch - Draw & P...|
|Pixel Draw - Numb...|
|Paper flowers ins...|
|Smoke Effect Phot...|
|    Infinite Painter|
|Garden Coloring Book|
|Kids Paint Free -...|
|Text on Photo - F...|
|Name Art Photo Ed...|
|Tattoo Name On My...|
|Mandala Coloring ...|
|3D Color Pixel by...|
|Learn T

In [61]:
#from pyspark.sql.functions import asc, desc

L_sort = [[5, "Bob"], [2, "Alice"], [5, "Mary"], [2, "Nicolai"]]
rdd_sort = sc.parallelize(L_sort)
df_sort = rdd_sort.toDF(["age", "name"])

df_sort.orderBy(["age", "name"], Ascending=[1, 0]).show()
df_sort.orderBy(asc("age"), desc("name")).show()


+---+-------+
|age|   name|
+---+-------+
|  2|  Alice|
|  2|Nicolai|
|  5|    Bob|
|  5|   Mary|
+---+-------+

+---+-------+
|age|   name|
+---+-------+
|  2|Nicolai|
|  2|  Alice|
|  5|   Mary|
|  5|    Bob|
+---+-------+



In [62]:
df_gps_20 = df_gps.withColumn("Rating/20", 4 * col("Rating"))
df_gps_20.show()


+--------------------+--------------+------+-------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|                 App|      Category|Rating|Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|      Last Updated|       Current Ver| Android Ver|Rating/20|
+--------------------+--------------+------+-------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|    159| 19M|    10,000+|Free|    0|      Everyone|        Art & Design|   January 7, 2018|             1.0.0|4.0.3 and up|     16.4|
| Coloring book moana|ART_AND_DESIGN|   3.9|    967| 14M|   500,000+|Free|    0|      Everyone|Art & Design;Pret...|  January 15, 2018|             2.0.0|4.0.3 and up|     15.6|
|U Launcher Lite –...|ART_AND_DESIGN|   4.7|  87510|8.7M| 5,000,000+|Free|    0|      Everyone|        Art & D

In [63]:
df_gps_renamed = df_gps_20.withColumnRenamed("Reviews", "Nbr of Reviews")
df_gps_renamed.show()


+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|                 App|      Category|Rating|Nbr of Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|      Last Updated|       Current Ver| Android Ver|Rating/20|
+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|           159| 19M|    10,000+|Free|    0|      Everyone|        Art & Design|   January 7, 2018|             1.0.0|4.0.3 and up|     16.4|
| Coloring book moana|ART_AND_DESIGN|   3.9|           967| 14M|   500,000+|Free|    0|      Everyone|Art & Design;Pret...|  January 15, 2018|             2.0.0|4.0.3 and up|     15.6|
|U Launcher Lite –...|ART_AND_DESIGN|   4.7|         87510|8.7M| 5,000,000+

In [64]:
df_gps_renamed.where(col("Rating/20") > 15).show()


+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|                 App|      Category|Rating|Nbr of Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|      Last Updated|       Current Ver| Android Ver|Rating/20|
+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|           159| 19M|    10,000+|Free|    0|      Everyone|        Art & Design|   January 7, 2018|             1.0.0|4.0.3 and up|     16.4|
| Coloring book moana|ART_AND_DESIGN|   3.9|           967| 14M|   500,000+|Free|    0|      Everyone|Art & Design;Pret...|  January 15, 2018|             2.0.0|4.0.3 and up|     15.6|
|U Launcher Lite –...|ART_AND_DESIGN|   4.7|         87510|8.7M| 5,000,000+

In [65]:
df_gps_renamed.where((col("Rating/20") > 15) | (col("Rating/20") < 5)).show()


+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|                 App|      Category|Rating|Nbr of Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|      Last Updated|       Current Ver| Android Ver|Rating/20|
+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|           159| 19M|    10,000+|Free|    0|      Everyone|        Art & Design|   January 7, 2018|             1.0.0|4.0.3 and up|     16.4|
| Coloring book moana|ART_AND_DESIGN|   3.9|           967| 14M|   500,000+|Free|    0|      Everyone|Art & Design;Pret...|  January 15, 2018|             2.0.0|4.0.3 and up|     15.6|
|U Launcher Lite –...|ART_AND_DESIGN|   4.7|         87510|8.7M| 5,000,000+

In [66]:
df_gps_renamed.select(
    when(col("Rating/20") >= 18, "Outstanding")
    .when(col("Rating/20") >= 16, "Excellent")
    .when(col("Rating/20") >= 14, "Very Good")
    .when(col("Rating/20") >= 12, "Good")
    .when(col("Rating/20") >= 10, "Fair")
    .otherwise("bad").alias("appreciation"), "Rating/20"
).show()

+------------+---------+
|appreciation|Rating/20|
+------------+---------+
|   Excellent|     16.4|
|   Very Good|     15.6|
| Outstanding|     18.8|
| Outstanding|     18.0|
|   Excellent|     17.2|
|   Excellent|     17.6|
|   Very Good|     15.2|
|   Excellent|     16.4|
|   Excellent|     17.6|
| Outstanding|     18.8|
|   Excellent|     17.6|
|   Excellent|     17.6|
|   Excellent|     16.8|
| Outstanding|     18.4|
|   Excellent|     17.6|
|        Good|     12.8|
| Outstanding|     18.8|
| Outstanding|     18.0|
|   Excellent|     17.2|
| Outstanding|     18.4|
+------------+---------+
only showing top 20 rows



In [67]:
df_gps_renamed.select(
    when(col("Rating/20") >= 10, "Fair")
    .when(col("Rating/20") >= 12, "Good")
    .when(col("Rating/20") >= 14, "Very Good")
    .when(col("Rating/20") >= 16, "Excellent")
    .when(col("Rating/20") >= 18, "Outstanding")
    .otherwise("Bad").alias("appreciation"), "Rating/20"
).show()

+------------+---------+
|appreciation|Rating/20|
+------------+---------+
|        Fair|     16.4|
|        Fair|     15.6|
|        Fair|     18.8|
|        Fair|     18.0|
|        Fair|     17.2|
|        Fair|     17.6|
|        Fair|     15.2|
|        Fair|     16.4|
|        Fair|     17.6|
|        Fair|     18.8|
|        Fair|     17.6|
|        Fair|     17.6|
|        Fair|     16.8|
|        Fair|     18.4|
|        Fair|     17.6|
|        Fair|     12.8|
|        Fair|     18.8|
|        Fair|     18.0|
|        Fair|     17.2|
|        Fair|     18.4|
+------------+---------+
only showing top 20 rows



In [68]:
df_gps_used = df_gps_renamed.withColumn("used", when(col("Nbr of Reviews") >= 10000, True)
                                        .otherwise(False))
df_gps_used.show()

+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+-----+
|                 App|      Category|Rating|Nbr of Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|      Last Updated|       Current Ver| Android Ver|Rating/20| used|
+--------------------+--------------+------+--------------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+---------+-----+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|           159| 19M|    10,000+|Free|    0|      Everyone|        Art & Design|   January 7, 2018|             1.0.0|4.0.3 and up|     16.4|false|
| Coloring book moana|ART_AND_DESIGN|   3.9|           967| 14M|   500,000+|Free|    0|      Everyone|Art & Design;Pret...|  January 15, 2018|             2.0.0|4.0.3 and up|     15.6|false|
|U Launcher Lite –...|ART_AND_DESIGN|   4.7| 

In [69]:
count_original = df_gps_used.count()
count_distinct = df_gps_used.distinct().count()
count_duplicates = count_original - count_distinct
print(count_original, count_distinct, count_duplicates)

10841 10358 483


In [70]:
df_unique = df_gps_used.dropDuplicates(['App'])
count_app_distinct = df_unique.count()
print(count_original, count_distinct, count_app_distinct)

10841 10358 9660


In [71]:
!pip install numpy



In [72]:
import numpy as np
from pyspark.sql.functions import isnan, isnull, col
from pyspark.sql.types import BooleanType


def getMissingValues(dataframe):
    count = dataframe.count()
    columns = dataframe.columns
    nan_count = []
    # we can't check for nan in a boolean type column
    for column in columns:
        if dataframe.schema[column].dataType == BooleanType():
            nan_count.append(0)
        else:
            nan_count.append(dataframe.where(isnan(col(column))).count())
    null_count = [dataframe.where(isnull(col(column))).count() for column in columns]
    return([count, columns, nan_count, null_count])    

def missingTables(stats):
    count, columns, nan_count, null_count = stats
    count = str(count)
    nan_count = [str(element) for element in nan_count]
    null_count = [str(element) for element in null_count]
    max_init = np.max([len(str(count)), 10])
    line1 = "+" + max_init*"-" + "+"
    line2 = "|" + (max_init-len(count))*" " + count + "|"
    line3 = "|" + (max_init-9)*" " + "nan count|"
    line4 = "|" + (max_init-10)*" " + "null count|"
    for i in range(len(columns)):
        max_column = np.max([len(columns[i]), len(nan_count[i]), len(null_count[i])])
        line1 += max_column*"-" + "+"
        line2 += (max_column - len(columns[i]))*" " + columns[i] + "|"
        line3 += (max_column - len(nan_count[i]))*" " + nan_count[i] + "|"
        line4 += (max_column - len(null_count[i]))*" " + null_count[i] + "|"
    lines = f"{line1}\n{line2}\n{line1}\n{line3}\n{line4}\n{line1}"
    print(lines)

missingTables(getMissingValues(df_unique))

+----------+---+--------+------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
|      9660|App|Category|Rating|Nbr of Reviews|Size|Installs|Type|Price|Content Rating|Genres|Last Updated|Current Ver|Android Ver|Rating/20|used|
+----------+---+--------+------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
| nan count|  0|       0|  1463|             0|   0|       0|   1|    0|             0|     0|           0|          7|          2|     1463|   0|
|null count|  0|       0|     0|             0|   0|       0|   0|    0|             1|     0|           0|          1|          1|        0|   0|
+----------+---+--------+------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+


In [75]:
df_clean = df_unique.drop("Rating")\
                    .dropna(subset=["Type", "Content Rating", "Android Ver"])
missingTables(getMissingValues(df_clean))

+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
|      9659|App|Category|Nbr of Reviews|Size|Installs|Type|Price|Content Rating|Genres|Last Updated|Current Ver|Android Ver|Rating/20|used|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
| nan count|  0|       0|             0|   0|       0|   1|    0|             0|     0|           0|          7|          2|     1463|   0|
|null count|  0|       0|             0|   0|       0|   0|    0|             0|     0|           0|          1|          0|        0|   0|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+


In [76]:
df_clean = df_clean.filter(~isnan("Type") & ~isnan("Android Ver"))
missingTables(getMissingValues(df_clean))

+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
|      9656|App|Category|Nbr of Reviews|Size|Installs|Type|Price|Content Rating|Genres|Last Updated|Current Ver|Android Ver|Rating/20|used|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
| nan count|  0|       0|             0|   0|       0|   0|    0|             0|     0|           0|          7|          0|     1462|   0|
|null count|  0|       0|             0|   0|       0|   0|    0|             0|     0|           0|          1|          0|        0|   0|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+


In [77]:
# We start by computing the average, be careful not to keep missing values
rating_average_df = df_clean.filter(~isnan("Rating/20")).select(avg("Rating/20"))

# Then we extract the corresponding float by using the head method
rating_average = rating_average_df.head()["avg(Rating/20)"]

# Now we can fill the missing values of the Rating/20 column
df_nomiss = df_clean.fillna({"Rating/20": rating_average})

# We check to see if we have remaing missing values
missingTables(getMissingValues(df_nomiss))

+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
|      9656|App|Category|Nbr of Reviews|Size|Installs|Type|Price|Content Rating|Genres|Last Updated|Current Ver|Android Ver|Rating/20|used|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
| nan count|  0|       0|             0|   0|       0|   0|    0|             0|     0|           0|          7|          0|        0|   0|
|null count|  0|       0|             0|   0|       0|   0|    0|             0|     0|           0|          1|          0|        0|   0|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+


In [78]:
grouped_ver = df_nomiss.groupBy("Current Ver").count().sort("count", ascending=False)
mode_ver = grouped_ver.head()["Current Ver"]
print(mode_ver)

df_final = df_nomiss.withColumn("Current Ver", when(isnan("Current Ver") | isnull("Current Ver"), mode_ver)\
                                              .otherwise(col("Current Ver")))
missingTables(getMissingValues(df_final))

Varies with device
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
|      9656|App|Category|Nbr of Reviews|Size|Installs|Type|Price|Content Rating|Genres|Last Updated|Current Ver|Android Ver|Rating/20|used|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+
| nan count|  0|       0|             0|   0|       0|   0|    0|             0|     0|           0|          0|          0|        0|   0|
|null count|  0|       0|             0|   0|       0|   0|    0|             0|     0|           0|          0|          0|        0|   0|
+----------+---+--------+--------------+----+--------+----+-----+--------------+------+------------+-----------+-----------+---------+----+


In [79]:
df_final.groupBy("Category").agg({"Genres": "mode", "Rating/20": "mean"}).show()


+-------------------+------------------+--------------------+
|           Category|    avg(Rating/20)|        mode(Genres)|
+-------------------+------------------+--------------------+
|             EVENTS|17.430645899438602|              Events|
|             SPORTS|16.830232703103583|              Sports|
|             COMICS|16.724739356323443|              Comics|
|            WEATHER| 16.94745460556195|             Weather|
|      VIDEO_PLAYERS|16.225708770894755|Video Players & E...|
|  AUTO_AND_VEHICLES| 16.75191086734913|     Auto & Vehicles|
|          PARENTING| 17.11545032950939|           Parenting|
|      ENTERTAINMENT| 16.54117647058823|       Entertainment|
|    PERSONALIZATION|17.193665118209623|     Personalization|
| HEALTH_AND_FITNESS| 16.92944057982804|    Health & Fitness|
|   TRAVEL_AND_LOCAL|16.338659649615547|      Travel & Local|
|BOOKS_AND_REFERENCE|17.215825246774724|   Books & Reference|
|     FOOD_AND_DRINK|16.689898532026906|        Food & Drink|
|       

In [80]:
from pyspark.sql.functions import col, count

def return_duplicates(df):
    return(df.groupBy(df.columns).agg(count("*").alias("duplicates")).filter(col("duplicates") >= 2))

return_duplicates(df).sort("duplicates", ascending=False).show()

+---+---------+---+----+------+----------+
|   |FirstName|Age|City|Cohort|duplicates|
+---+---------+---+----+------+----------+
+---+---------+---+----+------+----------+



In [82]:
df_user = spark.read.option("header", True)\
                    .option("inferSchema", True)\
                    .option("escape", "\"")\
                    .csv("gps_user.csv")

df_joined = df_final.join(df_user, "App")
df_joined.filter(~isnan("Sentiment_Polarity") & ~isnull("Sentiment_Polarity"))\
         .select("App", col("Sentiment_Polarity").cast("double"))\
         .groupBy("App")\
         .avg("Sentiment_Polarity")\
         .sort("avg(Sentiment_Polarity)", ascending=False)\
         .show()

+--------------------+-----------------------+
|                 App|avg(Sentiment_Polarity)|
+--------------------+-----------------------+
|            HomeWork|                    1.0|
|       Google Slides|     0.9333333333333332|
|Daily Workouts - ...|                    0.8|
|Bed Time Fan - Wh...|                0.78125|
|Cameringo Lite. F...|     0.7702690972222221|
|       Google Primer|                   0.75|
|        GPS Map Free|                    0.7|
|GPS Speedometer a...|                 0.6875|
|Best Ovulation Tr...|              0.5953125|
|3D Live Neon Weed...|     0.5681818181818181|
|    Cookbook Recipes|     0.5654370380932882|
|CM Flashlight (Co...|                0.55625|
|Goldstar: Live Ev...|     0.5492664141414142|
|Hipmunk Hotels & ...|     0.5463015873015874|
|850 Sports News D...|     0.5428597480561768|
|       C++ Tutorials|     0.5309259259259259|
|Haystack TV: Loca...|                 0.5295|
|All Football GO- ...|     0.5265151515151514|
|Home Workout

In [83]:
df_final.createOrReplaceTempView("APPS")

In [84]:
spark.sql("SELECT count(*) as freq, `Content Rating` FROM APPS GROUP BY `Content Rating` ORDER BY freq DESC").show()

+----+---------------+
|freq| Content Rating|
+----+---------------+
|7901|       Everyone|
|1036|           Teen|
| 393|     Mature 17+|
| 321|   Everyone 10+|
|   3|Adults only 18+|
|   2|        Unrated|
+----+---------------+



In [85]:
spark.sql("SELECT * FROM APPS WHERE `Content Rating`='Adults only 18+'").show()

+--------------------+--------+--------------+------------------+----------+----+-----+---------------+------+--------------+------------------+------------------+---------+-----+
|                 App|Category|Nbr of Reviews|              Size|  Installs|Type|Price| Content Rating|Genres|  Last Updated|       Current Ver|       Android Ver|Rating/20| used|
+--------------------+--------+--------------+------------------+----------+----+-----+---------------+------+--------------+------------------+------------------+---------+-----+
|DraftKings - Dail...|  SPORTS|         50017|               41M|1,000,000+|Free|    0|Adults only 18+|Sports| July 24, 2018|          3.21.324|        4.4 and up|     18.0| true|
|         Manga Books|  COMICS|          7326|Varies with device|  500,000+|Free|    0|Adults only 18+|Comics|August 3, 2018|Varies with device|Varies with device|     15.2|false|
|Manga Master - Be...|  COMICS|         24005|              4.9M|  500,000+|Free|    0|Adults only 1

In [86]:
spark.sql(""" 
    SELECT count(*) as `good apps`, Category
    FROM APPS
    WHERE `Rating/20` > 15
    GROUP BY CATEGORY
    HAVING `good apps` > 400
    ORDER BY `good apps` DESC
""").show()

+---------+--------+
|good apps|Category|
+---------+--------+
|     1590|  FAMILY|
|      886|    GAME|
|      662|   TOOLS|
+---------+--------+



In [87]:
def clean_installs(installs):
    return installs[:-1]

spark.udf.register("CLEAN_INSTALLS", clean_installs, StringType())
spark.sql("SELECT CLEAN_INSTALLS(Installs) FROM APPS").show()

+------------------------+
|CLEAN_INSTALLS(Installs)|
+------------------------+
|                     500|
|               1,000,000|
|                  10,000|
|                  10,000|
|                  10,000|
|                     100|
|                 100,000|
|                 500,000|
|                 100,000|
|                     100|
|                     500|
|              10,000,000|
|                   5,000|
|                  10,000|
|                  50,000|
|               1,000,000|
|              10,000,000|
|                  50,000|
|                       5|
|                   1,000|
+------------------------+
only showing top 20 rows



In [88]:
from pyspark.sql.functions import udf


clean_installsUDF = udf(clean_installs, StringType())
df_final.select(clean_installsUDF(col("Installs"))).show()

+------------------------+
|clean_installs(Installs)|
+------------------------+
|                     500|
|               1,000,000|
|                  10,000|
|                  10,000|
|                  10,000|
|                     100|
|                 100,000|
|                 500,000|
|                 100,000|
|                     100|
|                     500|
|              10,000,000|
|                   5,000|
|                  10,000|
|                  50,000|
|               1,000,000|
|              10,000,000|
|                  50,000|
|                       5|
|                   1,000|
+------------------------+
only showing top 20 rows

