In [1]:
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import *

spark = SparkSession.builder.appName("Pyspark_VS_Pandas").getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.driver.memory', '2g'), ('spark.executor.memory', '2g'), ('spark.executor.num','3'), ('spark.network.timeout', '1000000')])

## 1). dataframes with default schama

In [3]:
#1). read csv into pandas pyspark dataframes with default schama
dfpd = pd.read_csv("Pandas_vs_Pyspark.csv")
dfps = spark.read.load("Pandas_vs_Pyspark.csv", header='true', inferSchema='true', format='com.databricks.spark.csv' )

#O/P:
print(dfpd.dtypes)
print("\n")
print(dfps.dtypes)
print("\n")
print(dfpd.head())
print("\n")
dfps.show()

Date         object
City         object
Temper        int64
Wind          int64
Humidity    float64
dtype: object


[('Date', 'string'), ('City', 'string'), ('Temper', 'int'), ('Wind', 'int'), ('Humidity', 'double')]


         Date      City  Temper  Wind  Humidity
0  01-01-2019  Banglore      35    15      10.5
1  01-02-2019  Banglore      34    14       9.9
2  01-03-2019  Banglore      33    13       7.9
3  01-04-2019  Banglore      36    16       4.5
4  01-05-2019       HYD      45     8       9.0


+----------+--------+------+----+--------+
|      Date|    City|Temper|Wind|Humidity|
+----------+--------+------+----+--------+
|01-01-2019|Banglore|    35|  15|    10.5|
|01-02-2019|Banglore|    34|  14|     9.9|
|01-03-2019|Banglore|    33|  13|     7.9|
|01-04-2019|Banglore|    36|  16|     4.5|
|01-05-2019|     HYD|    45|   8|     9.0|
|01-06-2019|     HYD|    47|   7|     7.1|
|01-07-2019|     HYD|    49|   8|     8.4|
|01-08-2019| Chennai|    50|   2|     7.8|
|01-09-2019| Chenn

## 2). dataframes with defined schama

In [4]:
#2). read csv into pandas pyspark dataframes with defined schama
dfpd = pd.read_csv("Pandas_vs_Pyspark.csv", parse_dates=["Date"])
# dfpd = dfpd.astype(str) #1--> convert all columns to object datatype

# col_schema = {"Date":'datetime64', "City":'str', "Temper":'int64', "Wind":'int64', "Humidity":'float64'}
# dfpd = dfpd.astype(col_schema) #2--> convert all columns key value paired

dfpd = dfpd.infer_objects() #3). atomatically detect all datatypes 
print(dfpd.dtypes)
########################################

# dfps = spark.read.load("Pandas_vs_Pyspark.csv", header='true', inferSchema='true', format='com.databricks.spark.csv' )
schema = StructType([\
        StructField("Date", DateType()),\
        StructField("City", StringType()), \
        StructField("Temper", IntegerType()),\
        StructField("Wind", IntegerType()),\
        StructField("Humidity", DoubleType())])
dfps = spark.read.csv("Pandas_vs_Pyspark.csv", header='true', schema=schema )

print("\n")
print(dfps.dtypes)

Date        datetime64[ns]
City                object
Temper               int64
Wind                 int64
Humidity           float64
dtype: object


[('Date', 'date'), ('City', 'string'), ('Temper', 'int'), ('Wind', 'int'), ('Humidity', 'double')]


## 3). Schema information 

In [5]:
#3). Schema information 
print(dfpd.info())
print("\n")
print(dfps.printSchema())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 5 columns):
Date        10 non-null datetime64[ns]
City        10 non-null object
Temper      10 non-null int64
Wind        10 non-null int64
Humidity    10 non-null float64
dtypes: datetime64[ns](1), float64(1), int64(2), object(1)
memory usage: 480.0+ bytes
None


root
 |-- Date: date (nullable = true)
 |-- City: string (nullable = true)
 |-- Temper: integer (nullable = true)
 |-- Wind: integer (nullable = true)
 |-- Humidity: double (nullable = true)

None


## 4). Count of datafrmae, Shape, head, show

In [6]:
#4). Count of datafrmae
print("###Count###")
print(dfpd.count())
print(dfps.count())

print("###shape###")
print(dfpd.shape)
print(dfps.count(), len(dfps.columns))

print("###head, first###")
print(dfpd.head(5))
print(dfps.show(5))

###Count###
Date        10
City        10
Temper      10
Wind        10
Humidity    10
dtype: int64
10
###shape###
(10, 5)
10 5
###head, first###
        Date      City  Temper  Wind  Humidity
0 2019-01-01  Banglore      35    15      10.5
1 2019-01-02  Banglore      34    14       9.9
2 2019-01-03  Banglore      33    13       7.9
3 2019-01-04  Banglore      36    16       4.5
4 2019-01-05       HYD      45     8       9.0
+----------+--------+------+----+--------+
|      Date|    City|Temper|Wind|Humidity|
+----------+--------+------+----+--------+
|0006-07-12|Banglore|    35|  15|    10.5|
|0006-08-12|Banglore|    34|  14|     9.9|
|0006-09-09|Banglore|    33|  13|     7.9|
|0006-10-10|Banglore|    36|  16|     4.5|
|0006-11-09|     HYD|    45|   8|     9.0|
+----------+--------+------+----+--------+
only showing top 5 rows

None


## 5). Describe

In [7]:
#5). Describe
print(dfpd.describe())
print(dfps.describe().show())

          Temper       Wind   Humidity
count  10.000000  10.000000  10.000000
mean   43.500000   8.700000   8.210000
std     8.154753   5.578729   1.693419
min    33.000000   1.000000   4.500000
25%    35.250000   4.000000   7.650000
50%    46.000000   8.000000   8.150000
75%    49.750000  13.750000   9.300000
max    54.000000  16.000000  10.500000
+-------+--------+-----------------+-----------------+------------------+
|summary|    City|           Temper|             Wind|          Humidity|
+-------+--------+-----------------+-----------------+------------------+
|  count|      10|               10|               10|                10|
|   mean|    null|             43.5|              8.7| 8.209999999999999|
| stddev|    null|8.154753215150043|5.578729445153459|1.6934186330221677|
|    min|Banglore|               33|                1|               4.5|
|    max|     HYD|               54|               16|              10.5|
+-------+--------+-----------------+-----------------+---

## 6). Create DataFrame using colletion

In [49]:
#6). Create DataFrame using colletion
dict1 = {"Name": ["A","B","C","D","E","F","G","H","I"],\
         "Weight":[70,61,83,60,92,69,84,71,77],\
         "Address":["HYD","Banglore","Chennai","Mumbai","Banglore","Mumbai","Chennai","Banglore","HYD"],\
         "DOB":["15-01-1990", "19-01-1996", "28-02-1999", "13-06-1989", "15-11-2000", "10-12-1995", "25-11-1998", "15-09-1994", "15-01-1996"],\
         "Batch":[2016, 2017, 2018, 2016, 2016, 2017, 2016, 2018, 2017],\
         "Salary":[51000.00, 46500.50, 52000.00, 51000.00, 52000.00, 75000.60, 64000.50, 52000.00, 46500.50]         
        }

##
dfpd = pd.DataFrame(dict1)
dfpd_dtype = {"Name":'str', "Weight":'int64', "Address":'str', "DOB":'datetime64', "Batch":'int64', "Salary":'float64' }
dfpd = dfpd.astype(dfpd_dtype)

##
schema = StructType([\
                     StructField("Name", StringType(), True),\
                     StructField("Weight", IntegerType(), True),\
                     StructField("Address", StringType(), True),\
                     StructField("DOB", DateType(), True),\
                     StructField("Batch", IntegerType(), True),\
                     StructField("Salary", DoubleType(), True)])


dfps = spark.createDataFrame(dfpd, schema)
print(dfpd.head(10))
print(dfps.show())

  Name  Weight   Address        DOB  Batch   Salary
0    A      70       HYD 1990-01-15   2016  51000.0
1    B      61  Banglore 1996-01-19   2017  46500.5
2    C      83   Chennai 1999-02-28   2018  52000.0
3    D      60    Mumbai 1989-06-13   2016  51000.0
4    E      92  Banglore 2000-11-15   2016  52000.0
5    F      69    Mumbai 1995-10-12   2017  75000.6
6    G      84   Chennai 1998-11-25   2016  64000.5
7    H      71  Banglore 1994-09-15   2018  52000.0
8    I      77       HYD 1996-01-15   2017  46500.5
+----+------+--------+----------+-----+-------+
|Name|Weight| Address|       DOB|Batch| Salary|
+----+------+--------+----------+-----+-------+
|   A|    70|     HYD|1990-01-15| 2016|51000.0|
|   B|    61|Banglore|1996-01-19| 2017|46500.5|
|   C|    83| Chennai|1999-02-28| 2018|52000.0|
|   D|    60|  Mumbai|1989-06-13| 2016|51000.0|
|   E|    92|Banglore|2000-11-15| 2016|52000.0|
|   F|    69|  Mumbai|1995-10-12| 2017|75000.6|
|   G|    84| Chennai|1998-11-25| 2016|64000.5|


## 7). Add New column to existing datafrmae

In [50]:
#7). Add New column to existing datafrmae
dfpd["Salary_Hike"] = dfpd["Salary"]
dfpd["Salary_Hike_1year"] = dfpd["Salary"]+10000
dfpd["Salary_Hike_2year"] = dfpd["Salary"]+20000

dfps = dfps.withColumn("Salary_Hike", col("Salary"))
dfps = dfps.withColumn("Salary_Hike_1year", col("Salary")+10000)
dfps = dfps.withColumn("Salary_Hike_2year", col("Salary")+20000)

print(dfpd.head())
print(dfps.show())

  Name  Weight   Address        DOB  Batch   Salary  Salary_Hike  \
0    A      70       HYD 1990-01-15   2016  51000.0      51000.0   
1    B      61  Banglore 1996-01-19   2017  46500.5      46500.5   
2    C      83   Chennai 1999-02-28   2018  52000.0      52000.0   
3    D      60    Mumbai 1989-06-13   2016  51000.0      51000.0   
4    E      92  Banglore 2000-11-15   2016  52000.0      52000.0   

   Salary_Hike_1year  Salary_Hike_2year  
0            61000.0            71000.0  
1            56500.5            66500.5  
2            62000.0            72000.0  
3            61000.0            71000.0  
4            62000.0            72000.0  
+----+------+--------+----------+-----+-------+-----------+-----------------+-----------------+
|Name|Weight| Address|       DOB|Batch| Salary|Salary_Hike|Salary_Hike_1year|Salary_Hike_2year|
+----+------+--------+----------+-----+-------+-----------+-----------------+-----------------+
|   A|    70|     HYD|1990-01-15| 2016|51000.0|    

## 8). Rename Columns

In [51]:
#8). Rename Columns
dfpd = dfpd.rename(columns={"Salary_Hike":"DeleteThisColumn"})
dfps = dfps.withColumnRenamed("Salary_Hike","DeleteThisColumn")
print(dfpd.head())
print(dfps.show())

  Name  Weight   Address        DOB  Batch   Salary  DeleteThisColumn  \
0    A      70       HYD 1990-01-15   2016  51000.0           51000.0   
1    B      61  Banglore 1996-01-19   2017  46500.5           46500.5   
2    C      83   Chennai 1999-02-28   2018  52000.0           52000.0   
3    D      60    Mumbai 1989-06-13   2016  51000.0           51000.0   
4    E      92  Banglore 2000-11-15   2016  52000.0           52000.0   

   Salary_Hike_1year  Salary_Hike_2year  
0            61000.0            71000.0  
1            56500.5            66500.5  
2            62000.0            72000.0  
3            61000.0            71000.0  
4            62000.0            72000.0  
+----+------+--------+----------+-----+-------+----------------+-----------------+-----------------+
|Name|Weight| Address|       DOB|Batch| Salary|DeleteThisColumn|Salary_Hike_1year|Salary_Hike_2year|
+----+------+--------+----------+-----+-------+----------------+-----------------+-----------------+
|   A|

## 9).Delete Columns

In [52]:
#9).Delete Columns
dfpd = dfpd.drop("DeleteThisColumn", axis=1)
dfps = dfps.drop("DeleteThisColumn")

print(dfpd.head())
print(dfps.show())

  Name  Weight   Address        DOB  Batch   Salary  Salary_Hike_1year  \
0    A      70       HYD 1990-01-15   2016  51000.0            61000.0   
1    B      61  Banglore 1996-01-19   2017  46500.5            56500.5   
2    C      83   Chennai 1999-02-28   2018  52000.0            62000.0   
3    D      60    Mumbai 1989-06-13   2016  51000.0            61000.0   
4    E      92  Banglore 2000-11-15   2016  52000.0            62000.0   

   Salary_Hike_2year  
0            71000.0  
1            66500.5  
2            72000.0  
3            71000.0  
4            72000.0  
+----+------+--------+----------+-----+-------+-----------------+-----------------+
|Name|Weight| Address|       DOB|Batch| Salary|Salary_Hike_1year|Salary_Hike_2year|
+----+------+--------+----------+-----+-------+-----------------+-----------------+
|   A|    70|     HYD|1990-01-15| 2016|51000.0|          61000.0|          71000.0|
|   B|    61|Banglore|1996-01-19| 2017|46500.5|          56500.5|          66500.

## 10). Select Multiple columns

In [53]:
#10). Select Multiple columns
print(dfpd[['Address', 'DOB', 'Batch']].head(5))
print(dfps.select('Address', 'DOB', 'Batch').show(5))

    Address        DOB  Batch
0       HYD 1990-01-15   2016
1  Banglore 1996-01-19   2017
2   Chennai 1999-02-28   2018
3    Mumbai 1989-06-13   2016
4  Banglore 2000-11-15   2016
+--------+----------+-----+
| Address|       DOB|Batch|
+--------+----------+-----+
|     HYD|1990-01-15| 2016|
|Banglore|1996-01-19| 2017|
| Chennai|1999-02-28| 2018|
|  Mumbai|1989-06-13| 2016|
|Banglore|2000-11-15| 2016|
+--------+----------+-----+
only showing top 5 rows

None


## 11). Filter Conditions

In [54]:
#11). Filter Conditions
dfpd1 = dfpd[(dfpd["Address"]=="Banglore") & (dfpd["Weight"]>70)]
dfps1 = dfps.filter((col("Address")=="Banglore") & (col("Weight")>70))

print(dfpd1.head())
print(dfps1.show())

  Name  Weight   Address        DOB  Batch   Salary  Salary_Hike_1year  \
4    E      92  Banglore 2000-11-15   2016  52000.0            62000.0   
7    H      71  Banglore 1994-09-15   2018  52000.0            62000.0   

   Salary_Hike_2year  
4            72000.0  
7            72000.0  
+----+------+--------+----------+-----+-------+-----------------+-----------------+
|Name|Weight| Address|       DOB|Batch| Salary|Salary_Hike_1year|Salary_Hike_2year|
+----+------+--------+----------+-----+-------+-----------------+-----------------+
|   E|    92|Banglore|2000-11-15| 2016|52000.0|          62000.0|          72000.0|
|   H|    71|Banglore|1994-09-15| 2018|52000.0|          62000.0|          72000.0|
+----+------+--------+----------+-----+-------+-----------------+-----------------+

None


## 12). Sorting

In [55]:
#12). Sorting
dfpd3 = dfpd.sort_values(by=['Address', 'Salary'], ascending=[True, False])

dfps3 = dfps.orderBy("Address","Salary", ascending=[True, False])

print(dfpd3.head(20))
print(dfps3.show())

  Name  Weight   Address        DOB  Batch   Salary  Salary_Hike_1year  \
4    E      92  Banglore 2000-11-15   2016  52000.0            62000.0   
7    H      71  Banglore 1994-09-15   2018  52000.0            62000.0   
1    B      61  Banglore 1996-01-19   2017  46500.5            56500.5   
6    G      84   Chennai 1998-11-25   2016  64000.5            74000.5   
2    C      83   Chennai 1999-02-28   2018  52000.0            62000.0   
0    A      70       HYD 1990-01-15   2016  51000.0            61000.0   
8    I      77       HYD 1996-01-15   2017  46500.5            56500.5   
5    F      69    Mumbai 1995-10-12   2017  75000.6            85000.6   
3    D      60    Mumbai 1989-06-13   2016  51000.0            61000.0   

   Salary_Hike_2year  
4            72000.0  
7            72000.0  
1            66500.5  
6            84000.5  
2            72000.0  
0            71000.0  
8            66500.5  
5            95000.6  
3            71000.0  
+----+------+--------+-------

## 13). GroupBy aggregate functions:

In [56]:
#13). GroupBy aggregate functions:
dfpd4 = dfpd.groupby(["Address"]).agg({"Salary": [np.size, np.sum, np.mean, np.min, np.max]})\
.rename(columns={"size":"Count", "sum":"Total", "mean":"Average", "amin":"Minimum", "amax":"Maximum"})

#dfpd4 = dfpd.groupby(["Address"])["Salary"].mean()    #--> for specific column aggregate functions
#dfpd4 = dfpd.groupby(["Address"]).mean()    #--> for all columns aggregate functions

dfps4 = dfps.groupBy(col("Address")).agg({"Salary":'min'}) #--> 1column

# expr = [min(x) for x in dfps.columns]  #--> for all columns
# dfps.groupBy(col("Address")).agg(*expr).show()

print(dfpd4.head())
print(dfps4.show())

         Salary                                          
          Count     Total       Average  Minimum  Maximum
Address                                                  
Banglore    3.0  150500.5  50166.833333  46500.5  52000.0
Chennai     2.0  116000.5  58000.250000  52000.0  64000.5
HYD         2.0   97500.5  48750.250000  46500.5  51000.0
Mumbai      2.0  126000.6  63000.300000  51000.0  75000.6
+--------+-----------+
| Address|min(Salary)|
+--------+-----------+
| Chennai|    52000.0|
|  Mumbai|    51000.0|
|     HYD|    46500.5|
|Banglore|    46500.5|
+--------+-----------+

None


## 14). Row Number, Rank

In [60]:
#14). Row Number, Rank
# SQL--> ##SELECT A.*, ROW_NUMBER() OVER(PARTITION BY ADDRESS ORDER BY SALARY DESC)AS RN FROM DF A
# dfpd["RN"] = dfpd.sort_values(by=["Salary"], ascending=False).groupby("Address").cumcount()+1
# dfpd.sort_values(by=["Address","RN"]).head(100)

dfpd["RN"] = dfpd.groupby(["Address"])["Salary"].rank(method='max',  ascending=False)
dfpd.sort_values(by=["Address","RN"]).head(100)  #method=min,max,average,first,dense
#first --> ROW_NUMBER() in SQL, #max, min--> Rank() in SQL  #dense --> Dense_Rank() in SQL


dfps5 = dfps.withColumn("RN", rank().over(Window.partitionBy("Address").orderBy(col("Salary").desc()))).show() 
#row_number(), rank(), dense_rank()

Unnamed: 0,Name,Weight,Address,DOB,Batch,Salary,Salary_Hike_1year,Salary_Hike_2year,RN
4,E,92,Banglore,2000-11-15,2016,52000.0,62000.0,72000.0,2.0
7,H,71,Banglore,1994-09-15,2018,52000.0,62000.0,72000.0,2.0
1,B,61,Banglore,1996-01-19,2017,46500.5,56500.5,66500.5,3.0
6,G,84,Chennai,1998-11-25,2016,64000.5,74000.5,84000.5,1.0
2,C,83,Chennai,1999-02-28,2018,52000.0,62000.0,72000.0,2.0
0,A,70,HYD,1990-01-15,2016,51000.0,61000.0,71000.0,1.0
8,I,77,HYD,1996-01-15,2017,46500.5,56500.5,66500.5,2.0
5,F,69,Mumbai,1995-10-12,2017,75000.6,85000.6,95000.6,1.0
3,D,60,Mumbai,1989-06-13,2016,51000.0,61000.0,71000.0,2.0


In [40]:
dfps3.createOrReplaceTempView("DFPS3")
abcd = spark.sql("SELECT *FROM DFPS3 WHERE NAME='A' ")
abcd.show()

+----+------+-------+----------+-----+-------+-----------------+-----------------+
|Name|Weight|Address|       DOB|Batch| Salary|Salary_Hike_1year|Salary_Hike_2year|
+----+------+-------+----------+-----+-------+-----------------+-----------------+
|   A|    70|    HYD|1990-01-15| 2016|51000.0|          61000.0|          71000.0|
+----+------+-------+----------+-----+-------+-----------------+-----------------+



In [55]:
spark.stop()