# Spark - Part 3 - DataFrames and SQL

https://datascience-school.com/blog/practical-apache-spark-in-10-minutes.-part-3-dataframes-and-sql/

## Create Spark instance

In [3]:
import pyspark 
from pyspark.sql import SQLContext
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)

Simple test:

In [4]:
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[793, 160, 681, 339, 787]

## Dataframes

In [27]:
movies = sqlContext.read.format('csv') \
               .option("delimiter", ",")\
               .options(header='true', inferschema='true') \
               .load('data/movies.csv')

In [28]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [29]:
movies.select("title", "genres").show(3)

+--------------------+--------------------+
|               title|              genres|
+--------------------+--------------------+
|    Toy Story (1995)|Adventure|Animati...|
|      Jumanji (1995)|Adventure|Childre...|
|Grumpier Old Men ...|      Comedy|Romance|
+--------------------+--------------------+
only showing top 3 rows



In [30]:
ratings = sqlContext.read.format('csv') \
               .option("delimiter", ",")\
               .options(header='true', inferschema='true') \
               .load('data/ratings.csv')

In [32]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [35]:
ratings.describe("rating").show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            100836|
|   mean| 3.501556983616962|
| stddev|1.0425292390606342|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



In [33]:
ratings.filter(ratings['rating'] < 3.0).show(3)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1219|   2.0|964983393|
|     1|   2253|   2.0|964981775|
|     1|   2338|   2.0|964983546|
+------+-------+------+---------+
only showing top 3 rows



In [34]:
ratings.groupBy(ratings['rating']).count().orderBy('rating').show()

+------+-----+
|rating|count|
+------+-----+
|   0.5| 1370|
|   1.0| 2811|
|   1.5| 1791|
|   2.0| 7551|
|   2.5| 5550|
|   3.0|20047|
|   3.5|13136|
|   4.0|26818|
|   4.5| 8551|
|   5.0|13211|
+------+-----+



### SQL

In [38]:
ratings.createOrReplaceTempView("movielens")

In [39]:
sqlContext.sql("select * from movielens where rating < 3").show(3)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1219|   2.0|964983393|
|     1|   2253|   2.0|964981775|
|     1|   2338|   2.0|964983546|
+------+-------+------+---------+
only showing top 3 rows



## DataFrame from RDD

Create RDD

In [54]:
rdd = sc.textFile("data/movielens.txt")\
        .map(lambda line: line.split(","))\
        .map(lambda splits: (int(splits[0]), splits[1], splits[2]))

In [55]:
for elem in rdd.take(5):
    print(elem)

(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')
(2, 'Jumanji (1995)', 'Adventure|Children|Fantasy')
(3, 'Grumpier Old Men (1995)', 'Comedy|Romance')
(4, 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance')
(5, 'Father of the Bride Part II (1995)', 'Comedy')


In [57]:
from pyspark.sql.types import *

id_field = StructField("id", IntegerType(), True)
title_field = StructField("title", StringType(), True)
genres_field = StructField("genres", StringType(), True)

schema = StructType([id_field, title_field, genres_field])

Convert to df

In [59]:
movielens = sqlContext.createDataFrame(rdd, schema)

movielens.show(3)

+---+--------------------+--------------------+
| id|               title|              genres|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Adventure|Animati...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
+---+--------------------+--------------------+
only showing top 3 rows



Convert df to RDD

In [61]:
movielensRDD = movielens.rdd

for row in movielensRDD.take(2):
    print(row)

Row(id=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy')
Row(id=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy')


## JSON files

In [63]:
movies = sqlContext.read.json("data/movies.json")

In [65]:
movies.createOrReplaceTempView("movies")

nice_movies = sqlContext.sql("select title, rating from movies where rating > 4.9")

nice_movies.show(5)

+--------------------+------+
|               title|rating|
+--------------------+------+
|L'affaire Gordji,...|   6.3|
|Le naufrage du La...|   6.8|
|Le naufrage du La...|   6.8|
|       Africa United|   6.2|
|           Beginners|   7.2|
+--------------------+------+
only showing top 5 rows



Save to json

In [73]:
nice_movies.write.json("data/nice_movies")

## Apache Parquet

Apache Parquet is a popular column-oriented storage format, which is supported by a wide variety of data processing systems. It is often used with tools in the Hadoop ecosystem and supports all of the data types in Spark SQL.

Create Parquet file

In [69]:
movies.write.parquet('data/movielens.parquet')

Read Parquet file

In [74]:
df_parquet = sqlContext.read.parquet('data/movielens.parquet')

In [75]:
df_parquet.createOrReplaceTempView('parquetlens')

just_movies = sqlContext.sql("select title, rating from parquetlens where rating between 2 and 5")

just_movies.show(5)

+--------------------+------+
|               title|rating|
+--------------------+------+
|Bucky Larson : Bo...|   3.0|
|               Krach|   4.3|
|      L'élève Ducobu|   4.6|
|    Piège à Hongkong|   4.3|
|      Street fighter|   3.4|
+--------------------+------+
only showing top 5 rows



## Operations between df

In [77]:
a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']]).toDF(["other", "b_id"])

In [79]:
a.show()

+----+-----+
|a_id|extra|
+----+-----+
|   a|  foo|
|   b|  hem|
|   c|  haw|
+----+-----+



In [80]:
b.show()

+-----+----+
|other|b_id|
+-----+----+
|   p1|   a|
|   p2|   b|
|   p3|   c|
+-----+----+



In [81]:
c = a.join(b, a.a_id == b.b_id, how='inner')

c.show()

+----+-----+-----+----+
|a_id|extra|other|b_id|
+----+-----+-----+----+
|   c|  haw|   p3|   c|
|   b|  hem|   p2|   b|
|   a|  foo|   p1|   a|
+----+-----+-----+----+



In [82]:
c = a.join(b, a.a_id == b.b_id, how='left')

c.show()

+----+-----+-----+----+
|a_id|extra|other|b_id|
+----+-----+-----+----+
|   c|  haw|   p3|   c|
|   b|  hem|   p2|   b|
|   a|  foo|   p1|   a|
+----+-----+-----+----+



In [86]:
c.filter(~c['extra'].isin(['haw','hem'])).show()

+----+-----+-----+----+
|a_id|extra|other|b_id|
+----+-----+-----+----+
|   a|  foo|   p1|   a|
+----+-----+-----+----+

