PYSPARK OPERATIONS



A. INSTALLATION

1. install pyspark library in terminal

2. import findspark and modules



B. UNTYPED DATAFRAME OPERATIONS

1. create spark session

2. create dataframe from csv

3. print schema in a tree format

4. create dataframe from json

5. show entire dataframe

6. select and show one column from the dataframe

7. select several columns, and perform a mathematical operation on the values in one of the columns

8. filter rows based on a criterion

9. filter rows based on a criterion then select columns

10. take a field, group identical values in that field, count them, show results



C. RUNNING SQL QUERIES PROGRAMMATICALLY

1. create temporary view from your dataframe

- view will disappear after the session terminates


2. call sql query on temporary view and pass query into the sql() funcion


3. create global temporary view

- shared among all sessions
- will stay alive until spark application terminates; 
- view is tied to global_temp, a system preserved database;
- refer to your view as global_temp.name_of_your_view


4. access the global temporary view from a new session using the newSession() method






APPENDIX

1. difference between a package and a module

https://www.tutorialspoint.com/What-is-the-difference-between-a-python-module-and-a-python-package


2. library > package > module > sub module > class > method


3. difference between a module and a sub module

from module import submodule

eg. from datetime import date


4. module, package and directory

a module is a python file

a package is a directory of modules

a directory is a package containing an additional __init__.py file

In [3]:
# import libraries

import findspark

In [4]:
findspark.init()

In [5]:
from pyspark import SparkContext

In [6]:
from pyspark import SparkConf

In [7]:
from pyspark.sql import SparkSession

In [8]:
# create spark session

sparksession1 = SparkSession.builder.appName('Ops').getOrCreate()


'''
or

sparksession1 = SparkSession.builder.appName('Ops').config("spark.some.config.option", "some-value").getOrCreate()
'''

'\nor\n\nsparksession1 = SparkSession.builder.appName(\'Ops\').config("spark.some.config.option", "some-value").getOrCreate()\n'

In [9]:
'''

# create dataframe from json

df = sparksession1.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout

df.show()

'''

'\ndf = sparksession1.read.json("examples/src/main/resources/people.json")\n# Displays the content of the DataFrame to stdout\n\ndf.show()\n'

In [10]:
# create dataframe from csv

df = sparksession1.read.csv('heart.csv', inferSchema=True, header=True)

In [13]:
# print the schema in a tree format

df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



In [14]:
# show entire dataframe

df.show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|    0.0|    2|  0|   3|     1|
| 52|  1|  2|     172| 199|  1|      1|    162|    0|    0.5|    2|  0|   3|

In [12]:
# select and show one column from the dataframe

df.select('age').show()

+---+
|age|
+---+
| 63|
| 37|
| 41|
| 56|
| 57|
| 57|
| 56|
| 44|
| 52|
| 57|
| 54|
| 48|
| 49|
| 64|
| 58|
| 50|
| 58|
| 66|
| 43|
| 69|
+---+
only showing top 20 rows



In [15]:
# select several columns, and perform a mathematical operation on the values in one of the columns

df.select(df['age'] * 8, df['sex']).show()

+---------+---+
|(age * 8)|sex|
+---------+---+
|      504|  1|
|      296|  1|
|      328|  0|
|      448|  1|
|      456|  0|
|      456|  1|
|      448|  0|
|      352|  1|
|      416|  1|
|      456|  1|
|      432|  1|
|      384|  0|
|      392|  1|
|      512|  1|
|      464|  0|
|      400|  0|
|      464|  0|
|      528|  0|
|      344|  1|
|      552|  0|
+---------+---+
only showing top 20 rows



In [16]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



In [18]:
df.select(df['chol'] / 300, df['thal']).show()

+------------------+----+
|      (chol / 300)|thal|
+------------------+----+
|0.7766666666666666|   1|
|0.8333333333333334|   2|
|              0.68|   2|
|0.7866666666666666|   2|
|              1.18|   2|
|              0.64|   1|
|              0.98|   2|
|0.8766666666666667|   3|
|0.6633333333333333|   3|
|              0.56|   2|
|0.7966666666666666|   2|
|0.9166666666666666|   2|
|0.8866666666666667|   2|
|0.7033333333333334|   2|
|0.9433333333333334|   2|
|              0.73|   2|
|1.1333333333333333|   2|
|0.7533333333333333|   2|
|0.8233333333333334|   2|
|0.7966666666666666|   2|
+------------------+----+
only showing top 20 rows



In [19]:
# filter rows based on a criterion

df.filter(df['age'] > 55).show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|
| 57|  1|  2|     150| 168|  0|      1|    174|    0|    1.6|    2|  0|   2|     1|
| 64|  1|  3|     110| 211|  0|      0|    144|    1|    1.8|    1|  0|   2|     1|
| 58|  0|  3|     150| 283|  1|      0|    162|    0|    1.0|    2|  0|   2|     1|
| 58|  0|  2|     120| 340|  0|      1|    172|    0|    0.0|    2|  0|   2|

In [22]:
# filter rows based on a criterion then select specific columns

df2 = df.filter(df['age'] > 55)

df2.select(df['age']).show()

+---+
|age|
+---+
| 63|
| 56|
| 57|
| 57|
| 56|
| 57|
| 64|
| 58|
| 58|
| 66|
| 69|
| 59|
| 61|
| 71|
| 59|
| 65|
| 65|
| 65|
| 65|
| 66|
+---+
only showing top 20 rows



In [23]:
# take a field, group identical values, count them, show results

df.groupBy('age').count().show()

+---+-----+
|age|count|
+---+-----+
| 65|    8|
| 53|    8|
| 34|    2|
| 76|    1|
| 44|   11|
| 47|    5|
| 52|   13|
| 40|    3|
| 57|   17|
| 54|   16|
| 48|    7|
| 64|   10|
| 41|   10|
| 43|    8|
| 37|    2|
| 61|    8|
| 35|    4|
| 59|   14|
| 55|    8|
| 39|    4|
+---+-----+
only showing top 20 rows



In [24]:
# create temporary view from your dataframe

df.createOrReplaceTempView("temporary")

In [27]:
# call an sql query on your temporary view and pass query into the sql() funcion

sql_df = sparksession1.sql("SELECT * FROM temporary WHERE age = 37").show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 37|  0|  2|     120| 215|  0|      1|    170|    0|    0.0|    2|  0|   2|     1|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+



In [28]:
# create global temporary view (shared among all sessions)

df.createGlobalTempView("tempglobal")

In [30]:
sql_df_global = sparksession1.sql("SELECT * FROM global_temp.tempglobal WHERE sex = 1").show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|    0.0|    2|  0|   3|     1|
| 52|  1|  2|     172| 199|  1|      1|    162|    0|    0.5|    2|  0|   3|     1|
| 57|  1|  2|     150| 168|  0|      1|    174|    0|    1.6|    2|  0|   2|     1|
| 54|  1|  0|     140| 239|  0|      1|    160|    0|    1.2|    2|  0|   2|     1|
| 49|  1|  1|     130| 266|  0|      1|    171|    0|    0.6|    2|  0|   2|

In [33]:
# access the global temporary view from a new session using the newSession() method

# df.createGlobalTempView("tempglobal")

sparksession1.newSession().sql("SELECT * FROM global_temp.tempglobal WHERE chol > 200").show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|    0.0|    2|  0|   3|     1|
| 54|  1|  0|     140| 239|  0|      1|    160|    0|    1.2|    2|  0|   2|     1|
| 48|  0|  2|     130| 275|  0|      1|    139|    0|    0.2|    2|  0|   2|