## Spark setup

In [0]:
!wget -q http://apache.forsale.plus/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
!tar xf /content/spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/usr/lib/jvm/java-8-openjdk-amd64/bin:" + os.environ["PATH"]
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [4]:
!pip install -q findspark
!pip install -q pyspark

[K     |████████████████████████████████| 217.8MB 60kB/s 
[K     |████████████████████████████████| 204kB 48.0MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [5]:
!git clone https://github.com/databricks/Spark-The-Definitive-Guide.git

Cloning into 'Spark-The-Definitive-Guide'...
remote: Enumerating objects: 2035, done.[K
remote: Total 2035 (delta 0), reused 0 (delta 0), pack-reused 2035[K
Receiving objects: 100% (2035/2035), 523.88 MiB | 31.35 MiB/s, done.
Resolving deltas: 100% (305/305), done.
Checking out files: 100% (1738/1738), done.


## Spark instance

In [0]:
import findspark as fs
fs.init()

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

APP_NAME = "Playgrounds"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

## Spark code

In [8]:
!head /content/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40


In [0]:
df = spark.read.option("header", "true").csv('/content/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv')
df.createOrReplaceTempView("dfTable")

In [14]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [12]:
# create own DataFrame
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, LongType

mySchema = StructType([
  StructField("name", StringType(), False),
  StructField("age", LongType(), False)     
])

myRow = Row("Peter", 23)
myDf = spark.createDataFrame([myRow], mySchema)
myDf.show()

+-----+---+
| name|age|
+-----+---+
|Peter| 23|
+-----+---+



In [32]:
# select and selectExpr
from pyspark.sql.functions import col, expr

df.select("DEST_COUNTRY_NAME").show(2)
df.select(col("ORIGIN_COUNTRY_NAME")).show(2)

# expr 
df.select(expr("length(DEST_COUNTRY_NAME)")).show(2)

# adding column
df.selectExpr("*", "(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as INTERNAL_FLIGHT").show(2)

# aggregates
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows

+-------------------+
|ORIGIN_COUNTRY_NAME|
+-------------------+
|            Romania|
|            Croatia|
+-------------------+
only showing top 2 rows

+-------------------------+
|length(DEST_COUNTRY_NAME)|
+-------------------------+
|                       13|
|                       13|
+-------------------------+
only showing top 2 rows

+-----------------+-------------------+-----+---------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|INTERNAL_FLIGHT|
+-----------------+-------------------+-----+---------------+
|    United States|            Romania|   15|          false|
|    United States|            Croatia|    1|          false|
+-----------------+-------------------+-----+---------------+
only showing top 2 rows

+--------------------------+---------------------------------+
|avg(CAST(count AS DOUBLE))|count(DISTINCT DE