<a href="https://colab.research.google.com/github/punkmic/pyspark-concepts/blob/master/pyspark_intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Connect to private Github repository using SSH**

In [5]:
! ssh-keygen -t ed25519 
# Add github.com to our known hosts
! ssh-keyscan -t ed25519 github.com >> ~/.ssh/known_hosts
# Restrict the key permissions, or else SSH will complain.
! chmod go-rwx /root/.ssh/id_ed25519

Generating public/private ed25519 key pair.
Enter file in which to save the key (/root/.ssh/id_ed25519): 
Created directory '/root/.ssh'.
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /root/.ssh/id_ed25519.
Your public key has been saved in /root/.ssh/id_ed25519.pub.
The key fingerprint is:
SHA256:AhpyrO/7SuNpdchHd2wEsHVgS3gib9PNRXL4kkLIT0U root@bf2bb6f56919
The key's randomart image is:
+--[ED25519 256]--+
|     ..+*=Eoo    |
| .  . ==+ooo.    |
|. + .o.B.= +     |
| + o .= = O .    |
|. .. +.oS+ .     |
| .  + o.         |
|  +. o           |
| +.o             |
| .*+.            |
+----[SHA256]-----+
# github.com:22 SSH-2.0-babeld-408889af


In [6]:
!cat /root/.ssh/id_ed25519.pub

ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJukP/0AWx8LXkNQnB+iQVCsQ6EuQjTA8Yh02ID1oc/5 root@bf2bb6f56919


In [9]:
!git config --global user.email "mattheus_ribeiro@outlook.com"
!git config --global user.name "punkmic"

In [10]:
!ssh -T git@github.com

Hi punkmic! You've successfully authenticated, but GitHub does not provide shell access.


In [11]:
!git clone git@github.com:punkmic/pyspark-concepts.git

Cloning into 'pyspark-concepts'...
remote: Enumerating objects: 7, done.[K
remote: Counting objects: 100% (7/7), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 7 (delta 1), reused 3 (delta 0), pack-reused 0[K
Receiving objects: 100% (7/7), done.
Resolving deltas: 100% (1/1), done.


In [12]:
%cd /content/pyspark-concepts/

/content/pyspark-concepts


### **Install dependencies**

In [17]:
!touch requirements.txt

In [21]:
!echo "findspark" >> requirements.txt
!echo "pyspark" >> requirements.txt
!echo "pyspark[sql]" >> requirements.txt
!echo "pyspark[pandas_on_spark]" >> requirements.txt

In [41]:
!pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


### **Import dependencies**

In [70]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
import os
# for COLAB
from pyspark import SparkContext, SparkConf

**Spark: cluster computing framework that can handle Big Data in a lighting speed**



* PySpark does not use indexing
* All objects in PySpark are immutable
* Runs on a cluster: a set of connected computers






Benefits of spark:



*   Faster processing speed
*   Large storage capacity
*   Superior reliability



What is Spark



*   It's written in Scala
*   Spark utilizes Haddop for storage and process handling
*   Lazy computation



In [52]:
# get or create a new session
conf = SparkConf().set('spark.ui.port', '4050') # for colab
sc = SparkContext(conf=conf) # for colab
spark = SparkSession.builder.master('local[*]').appName("PySpark").getOrCreate()
spark

In [67]:
# print the number of cores
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
cores

1

In [26]:
data = [['max', 10], ['chloe', 16], ['rachel', 15]]
df = spark.createDataFrame(data, ['Name', 'Age'])

In [27]:
df.show()

+------+---+
|  Name|Age|
+------+---+
|   max| 10|
| chloe| 16|
|rachel| 15|
+------+---+



In [28]:
df.toPandas()

Unnamed: 0,Name,Age
0,max,10
1,chloe,16
2,rachel,15


In [30]:
df.columns

['Name', 'Age']

In [31]:
df.count()

3

In [98]:
path = 'students.csv'
df = spark.read.csv(path, header=True)
df.show(5)

+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|     

### **Aggregate Data**

In [99]:
df.groupBy("gender").agg({'math score':'mean'}).show()

+------+------------------+
|gender|   avg(math score)|
+------+------------------+
|female|63.633204633204635|
|  male| 68.72821576763485|
+------+------------------+



In [100]:
from pyspark.sql import functions as F
df.groupBy('gender').agg(F.min('math score'), F.max('math score'), F.avg('math score')).show()

+------+---------------+---------------+------------------+
|gender|min(math score)|max(math score)|   avg(math score)|
+------+---------------+---------------+------------------+
|female|              0|             99|63.633204633204635|
|  male|            100|             99| 68.72821576763485|
+------+---------------+---------------+------------------+



In [91]:
DATASETS_DIR = './datasets/'
if not os.path.exists(DATASETS_DIR):
  os.makedirs(DATASETS_DIR)
path = DATASETS_DIR + 'users*'
print(path)
parquet = spark.read.parquet(path)

./datasets/users*


In [92]:
parquet.limit(3).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,


In [93]:
users1_2 = spark.read.parquet(DATASETS_DIR+'users1.parquet', DATASETS_DIR+'users2.parquet')

In [94]:
users1_2.limit(3).toPandas()

Unnamed: 0,registration_dttm,id,first_name,last_name,email,gender,ip_address,cc,country,birthdate,salary,title,comments
0,2016-02-03 07:55:29,1,Amanda,Jordan,ajordan0@com.com,Female,1.197.201.2,6759521864920116.0,Indonesia,3/8/1971,49756.53,Internal Auditor,100.0
1,2016-02-03 17:04:03,2,Albert,Freeman,afreeman1@is.gd,Male,218.111.175.34,,Canada,1/16/1968,150280.17,Accountant IV,
2,2016-02-03 01:09:31,3,Evelyn,Morgan,emorgan2@altervista.org,Female,7.161.136.94,6767119071901597.0,Russia,2/1/1960,144972.51,Structural Engineer,


### **Validating Data**

In [95]:
df.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: string (nullable = true)
 |-- reading score: string (nullable = true)
 |-- writing score: string (nullable = true)



In [101]:
df.select('math score', 'reading score').summary('count', 'min', 'max').show()

+-------+----------+-------------+
|summary|math score|reading score|
+-------+----------+-------------+
|  count|      1000|         1000|
|    min|         0|          100|
|    max|        99|           99|
+-------+----------+-------------+



### **Specify data types**

In [102]:
from pyspark.sql.types import *

In [113]:
data_schema = [StructField('name', StringType(), True),
               StructField('email', StringType(), True),
               StructField('city', StringType(), True),
               StructField('mac', StringType(), True),
               StructField('timestamp', DateType(), True),
               StructField('creditcard', StringType(), True)
]

In [114]:
final_struc = StructType(fields = data_schema)

In [115]:
people = spark.read.json(DATASETS_DIR+'people.json', final_struc)

In [116]:
people.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- timestamp: date (nullable = true)
 |-- creditcard: string (nullable = true)



### **Writing in Data**

In [117]:
df.write.mode('overwrite').csv('write_test.csv')

In [121]:
# write a parquet
users1_2.write.mode('overwrite').parquet('parquet/')

In [122]:
# partition data into two 
users1_2.write.mode('overwrite').partitionBy('gender').parquet('part_parquet/')