### `Spark Demo`

### What is Spark?
https://spark.apache.org/

Spark is an open-source analytics engine for executing data engineering, data science and machine learning on big data workloads.

In [1]:
# Imports
from IPython.display import display, Markdown
from pyspark.sql import SparkSession, functions as F, types as T

https://spark.apache.org/docs/3.1.1/api/python/getting_started/quickstart.html

In [2]:
spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.sql.repl.eagerEval.enabled", "true") \
        .appName("mySpark") \
        .getOrCreate()

##### Set config values *after* the session has been created
All configurable parameters are available here: https://spark.apache.org/docs/latest/configuration.html 
<br>In this example: The max number of characters for each cell that is returned by eager evaluation. Eager evaluation must be on.

In [4]:
spark.conf.set("spark.sql.repl.eagerEval.truncate", 512)

In [6]:
# Display Spark UI
spark

#### Importing data into Spark
###### JSON and CSV

In [8]:
path_to_json_file = 'booksdata.json'
path_to_csv_file_1 = 'region.txt'
path_to_csv_file_2 = 'nation.tbl'

In [9]:
# JSON import, a simple case
df_from_json = spark.read.option('multiline', 'true').json(path_to_json_file)

In [13]:
# Import CSV file with schema
df_region = spark.read.csv(path_to_csv_file_1, header=True, sep=';', inferSchema=True)

df_region.show(truncate=True)

+-----------+-----------+--------------------+
|r_regionkey|     r_name|           r_comment|
+-----------+-----------+--------------------+
|          0|     AFRICA|lar deposits. bli...|
|          1|    AMERICA|hs use ironic, ev...|
|          2|       ASIA|ges. thinly even ...|
|          3|     EUROPE|ly final courts c...|
|          4|MIDDLE EAST|uickly special ac...|
+-----------+-----------+--------------------+



##### Inspect data, some useful functions

In [14]:
# List the columns 
display(df_region.columns)

['r_regionkey', 'r_name', 'r_comment']

In [15]:
# Display all the records
df_region.show()

+-----------+-----------+--------------------+
|r_regionkey|     r_name|           r_comment|
+-----------+-----------+--------------------+
|          0|     AFRICA|lar deposits. bli...|
|          1|    AMERICA|hs use ironic, ev...|
|          2|       ASIA|ges. thinly even ...|
|          3|     EUROPE|ly final courts c...|
|          4|MIDDLE EAST|uickly special ac...|
+-----------+-----------+--------------------+



In [16]:
# Show the columns and their data types
df_region.dtypes

[('r_regionkey', 'int'), ('r_name', 'string'), ('r_comment', 'string')]

In [17]:
display(Markdown("#### Close up inspection of the data types"))
display(Markdown("[('r_regionkey', '**int**'), ('r_name', '**string**'), ('r_comment', '**string**')]"))
display(Markdown("#### Re-import the data frame without the `inferSchema=True` option"))

df_region = spark.read.csv(path_to_csv_file_1, header=True, sep=';')
display(Markdown(f"{df_region.dtypes}".replace('string', "**string**")))

#### Close up inspection of the data types

[('r_regionkey', '**int**'), ('r_name', '**string**'), ('r_comment', '**string**')]

#### Re-import the data frame without the `inferSchema=True` option

[('r_regionkey', '**string**'), ('r_name', '**string**'), ('r_comment', '**string**')]

#### Import a schema-less file

In [19]:
nation_columns = ['n_nationkey', 'n_name', 'n_regionkey', 'n_comment']

# Create a new struct type, assign Integer type to all the field with the value 'key' in them
nation_schema = T.StructType(
    [
        T.StructField(c, T.IntegerType() if 'key' in c else T.StringType(), True)
        for c in nation_columns
    ]
)

In [20]:
df_nation = spark.read.csv(path_to_csv_file_2, schema=nation_schema, header=False, sep='|')

In [21]:
# Some other functions to inspect the data
df_nation.first() # return the first row

Row(n_nationkey=0, n_name='ALGERIA', n_regionkey=0, n_comment=' haggle. carefully final deposits detect slyly agai')

In [22]:
df_nation.take(3) # return the first 3 rows

[Row(n_nationkey=0, n_name='ALGERIA', n_regionkey=0, n_comment=' haggle. carefully final deposits detect slyly agai'),
 Row(n_nationkey=1, n_name='ARGENTINA', n_regionkey=1, n_comment='al foxes promise slyly according to the regular accounts. bold requests alon'),
 Row(n_nationkey=2, n_name='BRAZIL', n_regionkey=1, n_comment='y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special ')]

In [23]:
df_nation.head(2) # return the first n rows

[Row(n_nationkey=0, n_name='ALGERIA', n_regionkey=0, n_comment=' haggle. carefully final deposits detect slyly agai'),
 Row(n_nationkey=1, n_name='ARGENTINA', n_regionkey=1, n_comment='al foxes promise slyly according to the regular accounts. bold requests alon')]

In [24]:
# Print the schema 
df_nation.schema

StructType(List(StructField(n_nationkey,IntegerType,true),StructField(n_name,StringType,true),StructField(n_regionkey,IntegerType,true),StructField(n_comment,StringType,true)))

#### Queries using SQL
Spark API: https://spark.apache.org/docs/latest/sql-programming-guide.html

In [25]:
# Display the schema, to have a clear view of the columns in our data frame
df_from_json.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- first: string (nullable = true)
 |    |    |-- last: string (nullable = true)
 |-- date: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- price: double (nullable = true)
 |-- publisher: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone: string (nullable = true)
 |-- title: string (nullable = true)



#### Before query using SQL, we must register the dataframe as a table

In [26]:
# Display how many rows we are dealing with
df_from_json.count()

1000

In [27]:
# Can be made global as well. Global views are shared among different sessions
df_from_json.createOrReplaceTempView('books')

query = '''
    select title, price
    from books
    where price < 20
    and year(to_date(date, 'M/d/yyyy')) = 2017
    order by price desc
'''

In [28]:
# Run query
spark.sql(query).limit(10)

title,price
The Land Before Time XIII: The Wisdom of Friends,19.94
"Andromeda Strain, The",19.92
Halloweentown,19.91
Ankur (The Seedling),19.88
Tarzan Escapes,19.84
Rembrandt,19.83
Run,19.81
My Bodyguard,19.79
"Cool, Dry Place, A",19.76
"Blood of a Poet, The (Sang d'un poète, Le)",19.71


#### Queries using built-in functions

In [29]:
# Select
df_nation.select("n_name").limit(5)

n_name
ALGERIA
ARGENTINA
BRAZIL
CANADA
EGYPT


In [36]:
# Select, unnesting an array
df_from_json.select("isbn", "title", F.explode("authors.last")).limit(10)

isbn,title,col
846896359-3,"Jungle Book 2, The",Ashley
846896359-3,"Jungle Book 2, The",Finley
846896359-3,"Jungle Book 2, The",Bullus
558973823-7,Star Trek III: The Search for Spock,Ibel
558973823-7,Star Trek III: The Search for Spock,Grasner
558973823-7,Star Trek III: The Search for Spock,Laven
558973823-7,Star Trek III: The Search for Spock,Lapsley
411502148-9,"Verdict, The",Broadist
411502148-9,"Verdict, The",Mugleston
411502148-9,"Verdict, The",Mannock


In [37]:
# Select using a column alias
df_from_json.select("isbn", "title", F.col('date').alias('date_published')).limit(10)

isbn,title,date_published
846896359-3,"Jungle Book 2, The",12/28/2017
558973823-7,Star Trek III: The Search for Spock,4/19/2017
411502148-9,"Verdict, The",10/3/2017
449293194-5,"Other Dream Team, The",2/6/2018
406062162-5,Ong-Bak: The Thai Warrior (Ong Bak),3/15/2018
952684615-X,Asphalt Playground (La cité rose),8/20/2017
737347720-8,"Children of Leningradsky, The",3/19/2018
185454892-1,"Eyewitness (Janitor, The)",3/13/2018
537833183-8,Desk Set,7/25/2017
751635055-9,"Last Truck: Closing of a GM Plant, The",10/3/2017


#### Create a date type column

In [38]:
df_from_json = df_from_json.withColumn(
    'cast_date',
    F.to_date('date', 'M/d/y') # MM/dd/yyyy would fail if some dates don't comply with this format, using single mask values will work for every combination
)

In [39]:
df_from_json.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- first: string (nullable = true)
 |    |    |-- last: string (nullable = true)
 |-- date: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- price: double (nullable = true)
 |-- publisher: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone: string (nullable = true)
 |-- title: string (nullable = true)
 |-- cast_date: date (nullable = true)



#### Drop a column

In [40]:
df_from_json = df_from_json.drop('date')
df_from_json.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- first: string (nullable = true)
 |    |    |-- last: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- price: double (nullable = true)
 |-- publisher: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone: string (nullable = true)
 |-- title: string (nullable = true)
 |-- cast_date: date (nullable = true)



#### Renaming a column

In [41]:
df_from_json = df_from_json.withColumnRenamed('cast_date', 'date')
df_from_json.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- first: string (nullable = true)
 |    |    |-- last: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- price: double (nullable = true)
 |-- publisher: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone: string (nullable = true)
 |-- title: string (nullable = true)
 |-- date: date (nullable = true)



In [44]:
# Now that we have a date column, we can do a lot more!

# Select When 
df_from_json.select(
    "title",
    F.year("date"),
    F.when(
        F.year("date") >= 2018, "Modern book"
    ).otherwise("Old book").alias("Modern literature")
)

title,year(date),Modern literature
"Jungle Book 2, The",2017,Old book
Star Trek III: The Search for Spock,2017,Old book
"Verdict, The",2017,Old book
"Other Dream Team, The",2018,Modern book
Ong-Bak: The Thai Warrior (Ong Bak),2018,Modern book
Asphalt Playground (La cité rose),2017,Old book
"Children of Leningradsky, The",2018,Modern book
"Eyewitness (Janitor, The)",2018,Modern book
Desk Set,2017,Old book
"Last Truck: Closing of a GM Plant, The",2017,Old book


#### String manipulation functions

In [45]:
# Using the function as a test (T or F)
display(df_nation.select("n_name", F.lower("n_name").startswith('ger')).limit(10))

n_name,"startswith(lower(n_name), ger)"
ALGERIA,False
ARGENTINA,False
BRAZIL,False
CANADA,False
EGYPT,False
ETHIOPIA,False
FRANCE,False
GERMANY,True
INDIA,False
INDONESIA,False


In [46]:
display(df_nation.select("n_name", "n_comment").where(F.upper("n_name").endswith('ZIL')))

n_name,n_comment
BRAZIL,y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special


In [47]:
# Contains function, similarly we can use between, like and substring
df_nation.select(df_nation.n_name).where(F.col('n_name').contains('IA')).limit(10)

n_name
ALGERIA
ETHIOPIA
INDIA
INDONESIA
ROMANIA
SAUDI ARABIA
RUSSIA


#### Group by, filter and sort functions

In [48]:
# Count number of nations per region
df_nation.groupby('n_regionkey').count()

n_regionkey,count
1,5
3,5
4,5
2,5
0,5


In [49]:
# Sort books by year published
df_from_json.select("title", "date").sort(F.col('date').desc()).limit(20)

title,date
I Want You,2018-04-03
"Formula, The",2018-04-03
"Story of Esther Costello, The",2018-04-02
"Curse of the Cat People, The",2018-04-01
"Dam Busters, The",2018-04-01
Playing by Heart,2018-04-01
Momo,2018-04-01
Chang: A Drama of the Wilderness,2018-03-31
Rape Me (Baise-moi),2018-03-31
Vampire Effect (The Twins Effect) (Chin gei bin),2018-03-31


In [50]:
# Filter, with multiple ways to access the rows in a dataframe
df_region.filter(df_region["r_regionkey"] > 2)

r_regionkey,r_name,r_comment
3,EUROPE,ly final courts cajole furiously final excuse
4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl


In [51]:
df_region.filter(df_region.r_regionkey > 2)

r_regionkey,r_name,r_comment
3,EUROPE,ly final courts cajole furiously final excuse
4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl


In [52]:
df_region.filter(F.col("r_regionkey") > 2)

r_regionkey,r_name,r_comment
3,EUROPE,ly final courts cajole furiously final excuse
4,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl


#### Spark supports joins: outer, inner, right, left
https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html

In [53]:
df_region.join(df_nation, df_region.r_regionkey == df_nation.n_regionkey).select('r_name', 'n_name').where(F.col('r_name') == 'AMERICA')

r_name,n_name
AMERICA,ARGENTINA
AMERICA,BRAZIL
AMERICA,CANADA
AMERICA,PERU
AMERICA,UNITED STATES


In [54]:
# Write joins specifying the join type
display(Markdown(" Joining on df_region.**r_regionkey** == df_nation.**n_nationkey** (not the foreign key) to display some nulls and illustrate the joins"))
df_region.join(df_nation, df_region.r_regionkey == df_nation.n_nationkey, "right").limit(10)

 Joining on df_region.**r_regionkey** == df_nation.**n_nationkey** (not the foreign key) to display some nulls and illustrate the joins

r_regionkey,r_name,r_comment,n_nationkey,n_name,n_regionkey,n_comment
0.0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to,0,ALGERIA,0,haggle. carefully final deposits detect slyly agai
1.0,AMERICA,"hs use ironic, even requests. s",1,ARGENTINA,1,al foxes promise slyly according to the regular accounts. bold requests alon
2.0,ASIA,ges. thinly even pinto beans ca,2,BRAZIL,1,y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3.0,EUROPE,ly final courts cajole furiously final excuse,3,CANADA,1,"eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold"
4.0,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl,4,EGYPT,4,y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
,,,5,ETHIOPIA,0,ven packages wake quickly. regu
,,,6,FRANCE,3,"refully final requests. regular, ironi"
,,,7,GERMANY,3,"l platelets. regular accounts x-ray: unusual, regular acco"
,,,8,INDIA,2,ss excuses cajole slyly across the packages. deposits print aroun
,,,9,INDONESIA,2,slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull


In [56]:
df_nation.join(df_region, df_region.r_regionkey == df_nation.n_nationkey, "outer").limit(10)

n_nationkey,n_name,n_regionkey,n_comment,r_regionkey,r_name,r_comment
0,ALGERIA,0,haggle. carefully final deposits detect slyly agai,0.0,AFRICA,lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to
1,ARGENTINA,1,al foxes promise slyly according to the regular accounts. bold requests alon,1.0,AMERICA,"hs use ironic, even requests. s"
2,BRAZIL,1,y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special,2.0,ASIA,ges. thinly even pinto beans ca
3,CANADA,1,"eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold",3.0,EUROPE,ly final courts cajole furiously final excuse
4,EGYPT,4,y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d,4.0,MIDDLE EAST,uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl
5,ETHIOPIA,0,ven packages wake quickly. regu,,,
6,FRANCE,3,"refully final requests. regular, ironi",,,
7,GERMANY,3,"l platelets. regular accounts x-ray: unusual, regular acco",,,
8,INDIA,2,ss excuses cajole slyly across the packages. deposits print aroun,,,
9,INDONESIA,2,slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull,,,


#### Save to file
By default Spark saves files to parquet files but we can specify a other formats like json, csv
<br>Spark will save one file per partition, generate metadata, etc. Examples: https://spark.apache.org/docs/latest/sql-data-sources-csv.html
<br>A cleaner and easier solution is to use Pandas https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html


In [58]:
df_region.join(df_nation, df_region.r_regionkey == df_nation.n_regionkey).select('r_name', 'n_name').where(F.col('r_name') == 'AFRICA') \
    .toPandas() \
    .to_csv('myFile.csv', sep=';', encoding='utf-8', index=False)

##### Stopping spark session

In [59]:
spark.stop()

###### End / Fin / Koniec