<a href="https://colab.research.google.com/github/piyu18/PySpark/blob/main/PySpark_DataFrame_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### DataFrame:
PySpark DataFrame is almost similar to Pandas DataFrame except that, pandas DataFrame stores data on a single machine while PySpark DataFrames are distributed in the cluster (the data in DataFrame's are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines. PySpark DataFrames are lazily evaluated. They are implemented on top of RDD's.
PySpark applications start with initializing SparkSession which is the entry point of PySpark as below. 

In [None]:
!pip install pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#### DataFrame Creation

In [3]:
data = [('Tom','M',3000),
  ('Neal','M',4000),
  ('Robert','M',4000),
  ('Aina','F',4000),
  ('Mary','F',5000)
]
columns = ['name','gender','salary']
spark_df = spark.createDataFrame(data=data,schema=columns)
spark_df.show()

+------+------+------+
|  name|gender|salary|
+------+------+------+
|   Tom|     M|  3000|
|  Neal|     M|  4000|
|Robert|     M|  4000|
|  Aina|     F|  4000|
|  Mary|     F|  5000|
+------+------+------+



In [4]:
spark_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



Spark DataFrame using pandas DataFrame

In [5]:
import pandas as pd
pandas_df = pd.DataFrame(data=data,columns=columns)
spark_df = spark.createDataFrame(pandas_df)
spark_df.show(2)

+----+------+------+
|name|gender|salary|
+----+------+------+
| Tom|     M|  3000|
|Neal|     M|  4000|
+----+------+------+
only showing top 2 rows



In [6]:
spark_df.show(2, vertical=True)

-RECORD 0------
 name   | Tom  
 gender | M    
 salary | 3000 
-RECORD 1------
 name   | Neal 
 gender | M    
 salary | 4000 
only showing top 2 rows



In [7]:
spark_df.columns

['name', 'gender', 'salary']

DataFrame.collect() collects the distributed data to the driver side as the local data in Python. 
This can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [8]:
spark_df.collect()

[Row(name='Tom', gender='M', salary=3000),
 Row(name='Neal', gender='M', salary=4000),
 Row(name='Robert', gender='M', salary=4000),
 Row(name='Aina', gender='F', salary=4000),
 Row(name='Mary', gender='F', salary=5000)]

In order to avoid throwing an out-of-memory exception, use DataFrame.take() or DataFrame.tail()

In [9]:
spark_df.take(3)

[Row(name='Tom', gender='M', salary=3000),
 Row(name='Neal', gender='M', salary=4000),
 Row(name='Robert', gender='M', salary=4000)]

In [10]:
spark_df.tail(2)

[Row(name='Aina', gender='F', salary=4000),
 Row(name='Mary', gender='F', salary=5000)]

Conert PySpark DataFrame to pandas DataFrame

In [11]:
spark_df.toPandas()

Unnamed: 0,name,gender,salary
0,Tom,M,3000
1,Neal,M,4000
2,Robert,M,4000
3,Aina,F,4000
4,Mary,F,5000


PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column instance.

In [12]:
spark_df.gender

Column<'gender'>

In [13]:
from pyspark.sql import Column
from pyspark.sql.functions import lower

type(spark_df.name) == type(lower(spark_df.name)) == type(spark_df.name.isNull())

True

In [14]:
type(lower(spark_df.name))

pyspark.sql.column.Column

DataFrame.select() takes the Column instances that returns another DataFrame.

In [15]:
spark_df.select(spark_df.gender).show()

+------+
|gender|
+------+
|     M|
|     M|
|     M|
|     F|
|     F|
+------+



Assign new column instance

In [16]:
spark_df.withColumn('lower_gen', lower(spark_df.gender)).show()

+------+------+------+---------+
|  name|gender|salary|lower_gen|
+------+------+------+---------+
|   Tom|     M|  3000|        m|
|  Neal|     M|  4000|        m|
|Robert|     M|  4000|        m|
|  Aina|     F|  4000|        f|
|  Mary|     F|  5000|        f|
+------+------+------+---------+



To select a subset of rows, use DataFrame.filter().

In [17]:
spark_df.filter(spark_df.salary<4000).show()

+----+------+------+
|name|gender|salary|
+----+------+------+
| Tom|     M|  3000|
+----+------+------+



Grouping Data

In [18]:
spark_df.groupBy('gender').avg().show()

+------+------------------+
|gender|       avg(salary)|
+------+------------------+
|     M|3666.6666666666665|
|     F|            4500.0|
+------+------------------+



#### Working with SQL
DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:

In [19]:
spark_df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       5|
+--------+



In [20]:
spark.sql("Select * from tableA").show()

+------+------+------+
|  name|gender|salary|
+------+------+------+
|   Tom|     M|  3000|
|  Neal|     M|  4000|
|Robert|     M|  4000|
|  Aina|     F|  4000|
|  Mary|     F|  5000|
+------+------+------+



In [21]:
spark.sql("Select * from tableA where gender='M'").show()

+------+------+------+
|  name|gender|salary|
+------+------+------+
|   Tom|     M|  3000|
|  Neal|     M|  4000|
|Robert|     M|  4000|
+------+------+------+



#### PySpark UDF's (User defined Functions)
are similar to UDF on traditional databases. In PySpark, you create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively.
#### UDF with annotations

In [22]:
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import udf
@udf(returnType=IntegerType())
def add_one(int):
    return int + 1000

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(salary) FROM tableA").show()

+---------------+
|add_one(salary)|
+---------------+
|           4000|
|           5000|
|           5000|
|           5000|
|           6000|
+---------------+



#### Making UDF from sample functions

In [23]:
def str_lower(x):
  return x.lower()

In [24]:
udf_lower = udf(lambda x : str_lower(x))

In [25]:
spark_df.withColumn("Special Names", udf_lower("name")).show()

+------+------+------+-------------+
|  name|gender|salary|Special Names|
+------+------+------+-------------+
|   Tom|     M|  3000|          tom|
|  Neal|     M|  4000|         neal|
|Robert|     M|  4000|       robert|
|  Aina|     F|  4000|         aina|
|  Mary|     F|  5000|         mary|
+------+------+------+-------------+

