<a href="https://colab.research.google.com/github/ttwange/spark_learning/blob/main/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=776d082b4dee157a5da2fd8dd8bfdfea836e5d632b2a83f190833dc90b9cd616
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder\
.appName("MyProcess")\
.master("local[*]")\
.getOrCreate()

In [4]:
spark

In [6]:
header = ["city","type","price"]

data = map(lambda r: (r[0], r[1], float(r[2])),
           map(lambda x: x.split(","),
               ["Paris, Food, 19.00","Hola, Clothes, 12.00","Paris, Food, 19.00", "Rongai, Chapo, 20.50","Paris, Food, 19.00", "Riruta, House, 99.50","Mary, Book, 5.00","Rongai, Chapo, 20.50",]))
df = spark.createDataFrame(data, header)
df.show()


+------+--------+-----+
|  city|    type|price|
+------+--------+-----+
| Paris|    Food| 19.0|
|  Hola| Clothes| 12.0|
| Paris|    Food| 19.0|
|Rongai|   Chapo| 20.5|
| Paris|    Food| 19.0|
|Riruta|   House| 99.5|
|  Mary|    Book|  5.0|
|Rongai|   Chapo| 20.5|
+------+--------+-----+



In [7]:
df.take(2)

[Row(city='Paris', type=' Food', price=19.0),
 Row(city='Hola', type=' Clothes', price=12.0)]

In [8]:
df.printSchema()

root
 |-- city: string (nullable = true)
 |-- type: string (nullable = true)
 |-- price: double (nullable = true)



In [9]:
df

DataFrame[city: string, type: string, price: double]

In [11]:
df.dtypes

[('city', 'string'), ('type', 'string'), ('price', 'double')]

In [12]:
df.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[city#19,type#20,price#21]




In [13]:
df.select("city").show()

+------+
|  city|
+------+
| Paris|
|  Hola|
| Paris|
|Rongai|
| Paris|
|Riruta|
|  Mary|
|Rongai|
+------+



In [14]:
df.select(["city", "type"]).show()

+------+--------+
|  city|    type|
+------+--------+
| Paris|    Food|
|  Hola| Clothes|
| Paris|    Food|
|Rongai|   Chapo|
| Paris|    Food|
|Riruta|   House|
|  Mary|    Book|
|Rongai|   Chapo|
+------+--------+



Step 3:Basic manipulation : Schema


In [15]:
from pyspark.sql.types import StringType, FloatType, StructType, StructField

In [44]:
data = map(lambda r: (r[0], r[1], float(r[2])),
           map(lambda x: x.split(","),
               ["Paris, Food, 19.00","Hola, Clothes, 12.00","Paris, Snack, 19.00", "Rongai, Chapo, 20.50","Paris, Food, 19.00", "Riruta, House, 99.50","Mary, Book, 5.00","Rongai, Chapo, 20.50",]))

In [45]:
schema = StructType([
    StructField("city", StringType(),nullable = True),
    StructField("type", StringType(),nullable = True),
    StructField("price", FloatType(),nullable = True)
])

In [46]:
schema

StructType([StructField('city', StringType(), True), StructField('type', StringType(), True), StructField('price', FloatType(), True)])

In [47]:
df = spark.createDataFrame(
    data,
    schema=schema
)

In [60]:
df

DataFrame[city: string, type: string, price: float]

In [59]:
df.show()

+------+--------+-----+
|  city|    type|price|
+------+--------+-----+
| Paris|    Food| 19.0|
|  Hola| Clothes| 12.0|
| Paris|   Snack| 19.0|
|Rongai|   Chapo| 20.5|
| Paris|    Food| 19.0|
|Riruta|   House| 99.5|
|  Mary|    Book|  5.0|
|Rongai|   Chapo| 20.5|
+------+--------+-----+



Part 4: Filter and other functions in Pyspark


In [62]:
df.filter(df.city == "Paris").show()

+-----+------+-----+
| city|  type|price|
+-----+------+-----+
|Paris|  Food| 19.0|
|Paris| Snack| 19.0|
|Paris|  Food| 19.0|
+-----+------+-----+



In [64]:
df.filter((df.city == "Paris") & (df.price == "19.00")).show()

+-----+------+-----+
| city|  type|price|
+-----+------+-----+
|Paris|  Food| 19.0|
|Paris| Snack| 19.0|
|Paris|  Food| 19.0|
+-----+------+-----+



Part 5: OrderBy statements

In [65]:
df.orderBy(df.city).show()

+------+--------+-----+
|  city|    type|price|
+------+--------+-----+
|  Hola| Clothes| 12.0|
|  Mary|    Book|  5.0|
| Paris|    Food| 19.0|
| Paris|    Food| 19.0|
| Paris|   Snack| 19.0|
|Riruta|   House| 99.5|
|Rongai|   Chapo| 20.5|
|Rongai|   Chapo| 20.5|
+------+--------+-----+



In [67]:
df.orderBy(df.city).orderBy(df.price).show()

+------+--------+-----+
|  city|    type|price|
+------+--------+-----+
|  Mary|    Book|  5.0|
|  Hola| Clothes| 12.0|
| Paris|    Food| 19.0|
| Paris|    Food| 19.0|
| Paris|   Snack| 19.0|
|Rongai|   Chapo| 20.5|
|Rongai|   Chapo| 20.5|
|Riruta|   House| 99.5|
+------+--------+-----+



Part 6: Manipulating Columns with Pyspark

In [68]:
from pyspark.sql.functions import lit, rand
df = df.withColumn("six",lit(6))
df.show()

+------+--------+-----+---+
|  city|    type|price|six|
+------+--------+-----+---+
| Paris|    Food| 19.0|  6|
|  Hola| Clothes| 12.0|  6|
| Paris|   Snack| 19.0|  6|
|Rongai|   Chapo| 20.5|  6|
| Paris|    Food| 19.0|  6|
|Riruta|   House| 99.5|  6|
|  Mary|    Book|  5.0|  6|
|Rongai|   Chapo| 20.5|  6|
+------+--------+-----+---+



In [69]:
df.withColumn("discount", df.price * 0.75).show()

+------+--------+-----+---+--------+
|  city|    type|price|six|discount|
+------+--------+-----+---+--------+
| Paris|    Food| 19.0|  6|   14.25|
|  Hola| Clothes| 12.0|  6|     9.0|
| Paris|   Snack| 19.0|  6|   14.25|
|Rongai|   Chapo| 20.5|  6|  15.375|
| Paris|    Food| 19.0|  6|   14.25|
|Riruta|   House| 99.5|  6|  74.625|
|  Mary|    Book|  5.0|  6|    3.75|
|Rongai|   Chapo| 20.5|  6|  15.375|
+------+--------+-----+---+--------+



In [70]:
df.withColumn("New price", df.price + 6).show()

+------+--------+-----+---+---------+
|  city|    type|price|six|New price|
+------+--------+-----+---+---------+
| Paris|    Food| 19.0|  6|     25.0|
|  Hola| Clothes| 12.0|  6|     18.0|
| Paris|   Snack| 19.0|  6|     25.0|
|Rongai|   Chapo| 20.5|  6|     26.5|
| Paris|    Food| 19.0|  6|     25.0|
|Riruta|   House| 99.5|  6|    105.5|
|  Mary|    Book|  5.0|  6|     11.0|
|Rongai|   Chapo| 20.5|  6|     26.5|
+------+--------+-----+---+---------+



In [71]:
df.drop("six").show()

+------+--------+-----+
|  city|    type|price|
+------+--------+-----+
| Paris|    Food| 19.0|
|  Hola| Clothes| 12.0|
| Paris|   Snack| 19.0|
|Rongai|   Chapo| 20.5|
| Paris|    Food| 19.0|
|Riruta|   House| 99.5|
|  Mary|    Book|  5.0|
|Rongai|   Chapo| 20.5|
+------+--------+-----+



In [72]:
df.show()

+------+--------+-----+---+
|  city|    type|price|six|
+------+--------+-----+---+
| Paris|    Food| 19.0|  6|
|  Hola| Clothes| 12.0|  6|
| Paris|   Snack| 19.0|  6|
|Rongai|   Chapo| 20.5|  6|
| Paris|    Food| 19.0|  6|
|Riruta|   House| 99.5|  6|
|  Mary|    Book|  5.0|  6|
|Rongai|   Chapo| 20.5|  6|
+------+--------+-----+---+



In [73]:
df.sort(df.price.asc()).show()

+------+--------+-----+---+
|  city|    type|price|six|
+------+--------+-----+---+
|  Mary|    Book|  5.0|  6|
|  Hola| Clothes| 12.0|  6|
| Paris|    Food| 19.0|  6|
| Paris|    Food| 19.0|  6|
| Paris|   Snack| 19.0|  6|
|Rongai|   Chapo| 20.5|  6|
|Rongai|   Chapo| 20.5|  6|
|Riruta|   House| 99.5|  6|
+------+--------+-----+---+



Part 7: Rename columns

In [74]:
df.show()

+------+--------+-----+---+
|  city|    type|price|six|
+------+--------+-----+---+
| Paris|    Food| 19.0|  6|
|  Hola| Clothes| 12.0|  6|
| Paris|   Snack| 19.0|  6|
|Rongai|   Chapo| 20.5|  6|
| Paris|    Food| 19.0|  6|
|Riruta|   House| 99.5|  6|
|  Mary|    Book|  5.0|  6|
|Rongai|   Chapo| 20.5|  6|
+------+--------+-----+---+



In [75]:
df = df.withColumnRenamed("type", "Commodity")
df.show()

+------+---------+-----+---+
|  city|Commodity|price|six|
+------+---------+-----+---+
| Paris|     Food| 19.0|  6|
|  Hola|  Clothes| 12.0|  6|
| Paris|    Snack| 19.0|  6|
|Rongai|    Chapo| 20.5|  6|
| Paris|     Food| 19.0|  6|
|Riruta|    House| 99.5|  6|
|  Mary|     Book|  5.0|  6|
|Rongai|    Chapo| 20.5|  6|
+------+---------+-----+---+



GroupBy


In [111]:
SimpleData = [
    ("James", "Sales", "NY", 90000, 34, 10000),
    ("Emma", "Marketing", "CA", 85000, 29, 8000),
    ("Michael", "Engineering", "TX", 95000, 40, 12000),
    ("Sophia", "Human Resources", "IL", 80000, 31, 9000),
    ("William", "Finance", "NY", 92000, 36, 11000),
    ("Olivia", "Sales", "CA", 88000, 30, 9500),
    ("Daniel", "Marketing", "TX", 86000, 32, 8500),
    ("Ava", "Engineering", "CA", 98000, 38, 13000),
    ("Matthew", "Human Resources", "NY", 82000, 27, 7500),
    ("Emily", "Finance", "IL", 91000, 35, 10500),
    (" ", "Engineering", "CA", 93000, 33, 12500),
    ("Liam", None, "TX", 89000, 37, 10000),
    ("Oliver", "Sales"," " , 86000, 28, 9000),
    ("Harper", "Finance", "IL", None, 29, 9500)
]

schema = ["employee_name", "department", "state", "salary", "age", "bonus"]

df = spark.createDataFrame(data=SimpleData, schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+---------------+-----+------+---+-----+
|employee_name|department     |state|salary|age|bonus|
+-------------+---------------+-----+------+---+-----+
|James        |Sales          |NY   |90000 |34 |10000|
|Emma         |Marketing      |CA   |85000 |29 |8000 |
|Michael      |Engineering    |TX   |95000 |40 |12000|
|Sophia       |Human Resources|IL   |80000 |31 |9000 |
|William      |Finance        |NY   |92000 |36 |11000|
|Olivia       |Sales          |CA   |88000 |30 |9500 |
|Daniel       |Marketing      |TX   |86000 |32 |8500 |
|Ava          |Engineering    |CA   |98000 |38 |13000|
|Matthew      |Human Resources|NY   |82000 |27 |7500 |
|Emily        |Finance        |IL   |91000 |35 |10500|
|             |Engineering    |CA   |93000 |33 |125

In [78]:
df.groupBy("department").sum("salary").show()

+---------------+-----------+
|     department|sum(salary)|
+---------------+-----------+
|          Sales|     178000|
|    Engineering|     193000|
|        Finance|     183000|
|      Marketing|     171000|
|Human Resources|     162000|
+---------------+-----------+



In [79]:
df.groupBy("department").mean("salary").show()

+---------------+-----------+
|     department|avg(salary)|
+---------------+-----------+
|          Sales|    89000.0|
|    Engineering|    96500.0|
|        Finance|    91500.0|
|      Marketing|    85500.0|
|Human Resources|    81000.0|
+---------------+-----------+



In [81]:
df.groupBy("state").count().withColumnRenamed("count","StateCount").select("StateCount").show()

+----------+
|StateCount|
+----------+
|         3|
|         2|
|         3|
|         2|
+----------+



UDF User Defined Function in PySpark

In [87]:
df.show()

+-------------+---------------+-----+------+---+-----+
|employee_name|     department|state|salary|age|bonus|
+-------------+---------------+-----+------+---+-----+
|        James|          Sales|   NY| 90000| 34|10000|
|         Emma|      Marketing|   CA| 85000| 29| 8000|
|      Michael|    Engineering|   TX| 95000| 40|12000|
|       Sophia|Human Resources|   IL| 80000| 31| 9000|
|      William|        Finance|   NY| 92000| 36|11000|
|       Olivia|          Sales|   CA| 88000| 30| 9500|
|       Daniel|      Marketing|   TX| 86000| 32| 8500|
|          Ava|    Engineering|   CA| 98000| 38|13000|
|      Matthew|Human Resources|   NY| 82000| 27| 7500|
|        Emily|        Finance|   IL| 91000| 35|10500|
+-------------+---------------+-----+------+---+-----+



In [88]:
from pyspark.sql.functions import udf

In [96]:
@udf("float")
def add_3(x):
  return x * 2.45

In [98]:
df.select(add_3(df.salary)).alias("Performance_Bonus").show()

+-------------+
|add_3(salary)|
+-------------+
|     220500.0|
|     208250.0|
|     232750.0|
|     196000.0|
|     225400.0|
|     215600.0|
|     210700.0|
|     240100.0|
|     200900.0|
|     222950.0|
+-------------+



Pyspark to Pandas

In [99]:
df = df.toPandas()

In [100]:
import pandas as pd


In [102]:
df

Unnamed: 0,employee_name,department,state,salary,age,bonus
0,James,Sales,NY,90000,34,10000
1,Emma,Marketing,CA,85000,29,8000
2,Michael,Engineering,TX,95000,40,12000
3,Sophia,Human Resources,IL,80000,31,9000
4,William,Finance,NY,92000,36,11000
5,Olivia,Sales,CA,88000,30,9500
6,Daniel,Marketing,TX,86000,32,8500
7,Ava,Engineering,CA,98000,38,13000
8,Matthew,Human Resources,NY,82000,27,7500
9,Emily,Finance,IL,91000,35,10500


In [103]:
df.describe()

Unnamed: 0,salary,age,bonus
count,10.0,10.0,10.0
mean,88700.0,33.2,9900.0
std,5638.16361,4.131182,1760.681686
min,80000.0,27.0,7500.0
25%,85250.0,30.25,8625.0
50%,89000.0,33.0,9750.0
75%,91750.0,35.75,10875.0
max,98000.0,40.0,13000.0


Pandas to Pyspark


In [104]:
from pyspark.sql import SQLContext

In [106]:
sqlCtx = SQLContext(spark)
df = sqlCtx.createDataFrame(df).show()



+-------------+---------------+-----+------+---+-----+
|employee_name|     department|state|salary|age|bonus|
+-------------+---------------+-----+------+---+-----+
|        James|          Sales|   NY| 90000| 34|10000|
|         Emma|      Marketing|   CA| 85000| 29| 8000|
|      Michael|    Engineering|   TX| 95000| 40|12000|
|       Sophia|Human Resources|   IL| 80000| 31| 9000|
|      William|        Finance|   NY| 92000| 36|11000|
|       Olivia|          Sales|   CA| 88000| 30| 9500|
|       Daniel|      Marketing|   TX| 86000| 32| 8500|
|          Ava|    Engineering|   CA| 98000| 38|13000|
|      Matthew|Human Resources|   NY| 82000| 27| 7500|
|        Emily|        Finance|   IL| 91000| 35|10500|
+-------------+---------------+-----+------+---+-----+



Removing Null and none values

In [112]:
df.show()

+-------------+---------------+-----+------+---+-----+
|employee_name|     department|state|salary|age|bonus|
+-------------+---------------+-----+------+---+-----+
|        James|          Sales|   NY| 90000| 34|10000|
|         Emma|      Marketing|   CA| 85000| 29| 8000|
|      Michael|    Engineering|   TX| 95000| 40|12000|
|       Sophia|Human Resources|   IL| 80000| 31| 9000|
|      William|        Finance|   NY| 92000| 36|11000|
|       Olivia|          Sales|   CA| 88000| 30| 9500|
|       Daniel|      Marketing|   TX| 86000| 32| 8500|
|          Ava|    Engineering|   CA| 98000| 38|13000|
|      Matthew|Human Resources|   NY| 82000| 27| 7500|
|        Emily|        Finance|   IL| 91000| 35|10500|
|             |    Engineering|   CA| 93000| 33|12500|
|         Liam|           null|   TX| 89000| 37|10000|
|       Oliver|          Sales|     | 86000| 28| 9000|
|       Harper|        Finance|   IL|  null| 29| 9500|
+-------------+---------------+-----+------+---+-----+



In [114]:
df.filter(df.department.isNotNull()).show()

+-------------+---------------+-----+------+---+-----+
|employee_name|     department|state|salary|age|bonus|
+-------------+---------------+-----+------+---+-----+
|        James|          Sales|   NY| 90000| 34|10000|
|         Emma|      Marketing|   CA| 85000| 29| 8000|
|      Michael|    Engineering|   TX| 95000| 40|12000|
|       Sophia|Human Resources|   IL| 80000| 31| 9000|
|      William|        Finance|   NY| 92000| 36|11000|
|       Olivia|          Sales|   CA| 88000| 30| 9500|
|       Daniel|      Marketing|   TX| 86000| 32| 8500|
|          Ava|    Engineering|   CA| 98000| 38|13000|
|      Matthew|Human Resources|   NY| 82000| 27| 7500|
|        Emily|        Finance|   IL| 91000| 35|10500|
|             |    Engineering|   CA| 93000| 33|12500|
|       Oliver|          Sales|     | 86000| 28| 9000|
|       Harper|        Finance|   IL|  null| 29| 9500|
+-------------+---------------+-----+------+---+-----+

