<a href="https://colab.research.google.com/github/positivezenart/Python-codes/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [3]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [5]:
!pip install -q findspark
import findspark
findspark.init()

In [6]:
from pyspark import SparkContext
sc =SparkContext()

Installing libraries

In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr,concat,col
from pyspark.sql import Row
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col,struct,when

Applications running on PySpark are 100x faster than

traditional systems.
You will get great benefits using PySpark for data ingestion pipelines.
Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems.
PySpark also is used to process real-time data using Streaming and Kafka.

**Spark Session:**
SparkSession was introduced in version 2.0, It is an entry point to underlying PySpark functionality in order to programmatically create PySpark RDD, DataFrame.
https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/

In [16]:
spark = (SparkSession.builder
                      .appName('SparkByExamples.com') 
                      .getOrCreate())
print(spark) #create a new session

<pyspark.sql.session.SparkSession object at 0x7f6e8f3bea60>


In [22]:
spark2 = (SparkSession.newSession)
print(spark2) #creating new session

<function SparkSession.newSession at 0x7f6e8b567dc0>


In [13]:
spark3 = (SparkSession.builder.getOrCreate())
print(spark3) #accessing the existing session

<pyspark.sql.session.SparkSession object at 0x7f6e8f3bea60>


pyspark Dataframe

Create an empty RDD

In [14]:
empty_rdd = spark.sparkContext.emptyRDD()
print(empty_rdd)

EmptyRDD[0] at emptyRDD at NativeMethodAccessorImpl.java:0


In [17]:
#create a schema
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])


In [19]:
df = spark.createDataFrame(empty_rdd,schema)
df.show()

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+



In [20]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



Easy way to create a empty dataframe

In [21]:
df1 = spark.createDataFrame([],schema)
df1.show()

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+



Convert spark df to pandas

In [25]:
#data
data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]
#columns
columns = ["first_name","middle_name","last_name","dob","gender","salary"]
df3 = spark3.createDataFrame(data = data,schema= columns)

In [27]:
df3.printSchema()
df3.show()

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|  dob|gender|salary|
+----------+-----------+---------+-----+------+------+
|     James|           |    Smith|36636|     M| 60000|
|   Michael|       Rose|         |40288|     M| 70000|
|    Robert|           | Williams|42114|      |400000|
|     Maria|       Anne|    Jones|39192|     F|500000|
|       Jen|       Mary|    Brown|     |     F|     0|
+----------+-----------+---------+-----+------+------+



In [29]:
df3.show(2,vertical=True)

-RECORD 0--------------
 first_name  | James   
 middle_name |         
 last_name   | Smith   
 dob         | 36636   
 gender      | M       
 salary      | 60000   
-RECORD 1--------------
 first_name  | Michael 
 middle_name | Rose    
 last_name   |         
 dob         | 40288   
 gender      | M       
 salary      | 70000   
only showing top 2 rows



In [28]:
df4 = df3.toPandas()
print(df4)

  first_name middle_name last_name    dob gender  salary
0      James                 Smith  36636      M   60000
1    Michael        Rose            40288      M   70000
2     Robert              Williams  42114         400000
3      Maria        Anne     Jones  39192      F  500000
4        Jen        Mary     Brown             F       0


**PySpark StructType & StructField** classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns. StructType is a collection of StructField’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata.

In [38]:
#create a new spark session
spark_struct = SparkSession.builder.appName("Struct_type").getOrCreate()
#columns data
structureData = [
    (("James","","Smith"),36636,"M",3100),
    (("Michael","Rose",""),40288,"M",4300),
    (("Robert","","Williams"),42114,"M",1400),
    (("Maria","Anne","Jones"),39192,"F",5500),
    (("Jen","Mary","Brown"),0,"F",-1)
  ]
#schema file
schema = StructType([ 
    StructField("name",StructType([
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True)])), \
    StructField("id", IntegerType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
#create Data frame
df_struct = spark_struct.createDataFrame(structureData,schema)
df_struct.show()
df_struct.printSchema()

+--------------------+-----+------+------+
|                name|   id|gender|salary|
+--------------------+-----+------+------+
|    [James, , Smith]|36636|     M|  3100|
|   [Michael, Rose, ]|40288|     M|  4300|
|[Robert, , Williams]|42114|     M|  1400|
|[Maria, Anne, Jones]|39192|     F|  5500|
|  [Jen, Mary, Brown]|    0|     F|    -1|
+--------------------+-----+------+------+

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [43]:
updatedDF = df_struct.withColumn("Other_info",
                      struct(col("id").alias("identifier"),
                             col("gender").alias("gender"),
                             col("salary").alias("salary"),
                             when(col("salary").cast(IntegerType()) <2000,"Low")
                             .when(col("salary").cast(IntegerType())<4000,"medium")
                             .otherwise(col("salary").alias("Salary_grade"))))

In [45]:
updatedDF.show(truncate=False)

+--------------------+-----+------+------+------------------------+
|name                |id   |gender|salary|Other_info              |
+--------------------+-----+------+------+------------------------+
|[James, , Smith]    |36636|M     |3100  |[36636, M, 3100, medium]|
|[Michael, Rose, ]   |40288|M     |4300  |[40288, M, 4300, 4300]  |
|[Robert, , Williams]|42114|M     |1400  |[42114, M, 1400, Low]   |
|[Maria, Anne, Jones]|39192|F     |5500  |[39192, F, 5500, 5500]  |
|[Jen, Mary, Brown]  |0    |F     |-1    |[0, F, -1, Low]         |
+--------------------+-----+------+------+------------------------+

