### In this activity we will understand
- What are DataFrames in Spark ?
- Different ways to create a DataFrames
- What are Spark Transformations & Actions
- Verify Summary Statistics
- Spark SQL
- Performance Comparison of Spark DataFrame and Spark SQL
- Column References
- Converting to Spark Types - Literals
- Add/Rename/Remove Columns
- TypeCasting
- Column differences
- Pair-wise frequencies
- Remove duplicates
- Working with Nulls
- Filtering the rows
- Aggregations
- Joins
- Random Samples
- Random Splits
- Map Transformations
- Sorting
- Union
- String Manipulations
- Regular Expressions
- Working with Dates and Time Stamp
- User Defined Functions
- Broadcase variables and Accumulators
- Handling Different Data Sources
- Enabling for Conversion to/from Pandas
- Pandas UDFs (a.k.a. Vectorized UDFs)
- Scalar
- Grouped Map

In [0]:
myRangeDF = spark.range(100).toDF('number')

In [0]:
myRangeDF

DataFrame[number: bigint]

In [0]:
myRangeDF.show(4)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
+------+
only showing top 4 rows



In [0]:
nameDF = spark.createDataFrame([[1,'Alice',30],
                                [2,'Beneth',28],
                                [3,'Charlie',29],
                                [4,'Aligarh',31]],['Id','Name','Age'])

In [0]:
nameDF.show(3)

+---+-------+---+
| Id|   Name|Age|
+---+-------+---+
|  1|  Alice| 30|
|  2| Beneth| 28|
|  3|Charlie| 29|
+---+-------+---+
only showing top 3 rows



In [0]:
myRangeDF.count()

100

In [0]:
evenDF = myRangeDF.where('number % 2 == 0')

In [0]:
evenDF

DataFrame[number: bigint]

In [0]:
evenDF.show(3)

+------+
|number|
+------+
|     0|
|     2|
|     4|
+------+
only showing top 3 rows



In [0]:
evenDF.count()

50

In [0]:
sc = spark.sparkContext

In [0]:
sc

#### Infer Schema by using RDDs

In [0]:
tempRDD = sc.textFile('/temp_data.txt')

In [0]:
tempRDD.count()

13131

In [0]:
type(tempRDD)

pyspark.rdd.RDD

In [0]:
tempRDD.take(3)

['1901\t-78\t1', '1901\t-72\t1', '1901\t-94\t1']

In [0]:
splitRDD = tempRDD.map(lambda record : record.split('\t'))
splitRDD.take(3)

[['1901', '-78', '1'], ['1901', '-72', '1'], ['1901', '-94', '1']]

In [0]:
from pyspark.sql import Row

In [0]:
schemaRDD = splitRDD.map(lambda line : Row(year=line[0],temp=int(line[1]),status=int(line[2])))

In [0]:
schemaRDD.take(3)

[Row(year='1901', temp=-78, status=1),
 Row(year='1901', temp=-72, status=1),
 Row(year='1901', temp=-94, status=1)]

In [0]:
tempDF = spark.createDataFrame(schemaRDD)
tempDF.show(3)

+----+----+------+
|year|temp|status|
+----+----+------+
|1901| -78|     1|
|1901| -72|     1|
|1901| -94|     1|
+----+----+------+
only showing top 3 rows



In [0]:
tempDF.printSchema()

root
 |-- year: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- status: long (nullable = true)



In [0]:
### Read test.csv as Rdd and convert it to Dataframe
testRDD = sc.textFile('/test.csv')
testRDD.count()

233600

In [0]:
testRDD.take(2)

['User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3',
 '1000004,P00128942,M,46-50,7,B,2,1,1,11,']

In [0]:
header = testRDD.first()
testRDD = testRDD.filter(lambda line : line != header)
print('After the header record is removed ')
testRDD.first()

After the header record is removed 


'1000004,P00128942,M,46-50,7,B,2,1,1,11,'

In [0]:
### Split the data based on the separator - ','
splitRDD = testRDD.map(lambda line : line.split(','))
print("Afer splitting the records are : \n")
splitRDD.take(2)

Afer splitting the records are : 



[['1000004', 'P00128942', 'M', '46-50', '7', 'B', '2', '1', '1', '11', ''],
 ['1000009', 'P00113442', 'M', '26-35', '17', 'C', '0', '0', '3', '5', '']]

In [0]:
from pyspark.sql.types import * 
testRDDSchema = StructType([
      StructField('User_Id',StringType(),True),
      StructField('ProductId',StringType(),True),
      StructField('Gender',StringType(),True),
      StructField('Age',StringType(),True),
      StructField('Occupation',StringType(),True),
      StructField('City_Category',StringType(),True),
      StructField('Stay_In_Current_City_Years',StringType(),True),
      StructField('Marital_Status',StringType(),True),
      StructField('Product_Category_1',StringType(),True),
      StructField('Product_Category_2',StringType(),True),
      StructField('Product_Category_3',StringType(),True),
])

In [0]:
testDF = spark.createDataFrame(data=splitRDD,schema=testRDDSchema)

In [0]:
testDF.show(3)

+-------+---------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_Id|ProductId|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+---------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|1000004|P00128942|     M|46-50|         7|            B|                         2|             1|                 1|                11|                  |
|1000009|P00113442|     M|26-35|        17|            C|                         0|             0|                 3|                 5|                  |
|1000010|P00288442|     F|36-45|         1|            B|                        4+|             1|                 5|                14|                  |
+-------+---------+------+-----+----------+-------------+-

In [0]:
testDF.count()

233599

In [0]:
testSample = testDF.sample(False,0.1,33)

In [0]:
testSample.count()

23221

In [0]:
trainDF = spark.read.format('csv').option('header','true').option('inferSchema','true').load('/home/jayantm/Spark/SparkSQL/Data/train.csv')

In [0]:
trainDF

DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int]

In [0]:
trainDF.show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              null|              null|    1422|
+---

In [0]:
trainSample = trainDF.sample(False,0.1,435)
trainSample.count()

55007

In [0]:
trainSamplePD = trainSample.toPandas()
print(type(trainSample))
print(type(trainSamplePD))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


In [0]:
trainSamplePD.to_csv('/train_sample.csv')

In [0]:
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [0]:
trainDF.head(2)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200)]

In [0]:
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [0]:
print('Number of records in training dataset {}\n'.format(trainDF.count()))
print('Number of records in testing dataset {}\n'.format(testDF.count()))

Number of records in training dataset 550068

Number of records in testing dataset 233599



In [0]:
trainDF.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [0]:
testDF.describe().show()

+-------+------------------+---------+------+------+-----------------+-------------+--------------------------+------------------+------------------+------------------+------------------+
|summary|           User_Id|ProductId|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|    Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+------------------+---------+------+------+-----------------+-------------+--------------------------+------------------+------------------+------------------+------------------+
|  count|            233599|   233599|233599|233599|           233599|       233599|                    233599|            233599|            233599|            233599|            233599|
|   mean|1003029.3568594044|     null|  null|  null|8.085407043694536|         null|        1.4682778997642345|0.4100702485883929| 5.276542279718663| 9.849586059346997|12.669453946534905|
| stddev|  1726.50496799554|     null|  null|  null|6.521146

In [0]:
trainDF.describe('Purchase').show()

+-------+-----------------+
|summary|         Purchase|
+-------+-----------------+
|  count|           550068|
|   mean|9263.968712959126|
| stddev|5023.065393820575|
|    min|               12|
|    max|            23961|
+-------+-----------------+



In [0]:
trainDF.createOrReplaceTempView('trainDFTable')

In [0]:
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [0]:
#rdd1.map().flatMap.reduceByKey()
spark.sql("SELECT * FROM trainDFTable LIMIT 2")

DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int]

In [0]:
spark.sql("SELECT * FROM trainDFTable LIMIT 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [0]:
#### Physical plan being executed using Spark DataFrame vs Spark SQL
dataFrameWay = trainDF.groupBy('Age').count()

In [0]:
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[Age#118], functions=[count(1)])
+- Exchange hashpartitioning(Age#118, 200), ENSURE_REQUIREMENTS, [id=#289]
   +- *(1) HashAggregate(keys=[Age#118], functions=[partial_count(1)])
      +- FileScan csv [Age#118] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jayantm/Spark/SparkSQL/Data/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Age:string>




In [0]:
sqlWay = spark.sql("SELECT Age,count(1) from trainDFTable GROUP BY Age")


In [0]:
sqlWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[Age#118], functions=[count(1)])
+- Exchange hashpartitioning(Age#118, 200), ENSURE_REQUIREMENTS, [id=#308]
   +- *(1) HashAggregate(keys=[Age#118], functions=[partial_count(1)])
      +- FileScan csv [Age#118] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jayantm/Spark/SparkSQL/Data/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Age:string>




In [0]:
from pyspark.sql.functions import expr,col,column
dfWay = trainDF.filter(col('Age') != '0-17').groupBy('Age').count()

In [0]:
dfWay.show()

+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+



In [0]:
trainDF.select(expr("User_ID AS userID"),col("User_ID"),"User_ID").show(3)

+-------+-------+-------+
| userID|User_ID|User_ID|
+-------+-------+-------+
|1000001|1000001|1000001|
|1000001|1000001|1000001|
|1000001|1000001|1000001|
+-------+-------+-------+
only showing top 3 rows



In [0]:
trainDF.select(expr("User_ID as userID")).show(2)

+-------+
| userID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [0]:
spark.sql("SELECT User_ID as userID FROM trainDFTable").show(2)

+-------+
| userID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [0]:
trainDF.selectExpr('User_ID as userId','Product_ID as productID').show(2)

+-------+---------+
| userId|productID|
+-------+---------+
|1000001|P00069042|
|1000001|P00248942|
+-------+---------+
only showing top 2 rows



In [0]:
trainDF.select("User_ID","Product_ID","Age").show(3)

+-------+----------+----+
|User_ID|Product_ID| Age|
+-------+----------+----+
|1000001| P00069042|0-17|
|1000001| P00248942|0-17|
|1000001| P00087842|0-17|
+-------+----------+----+
only showing top 3 rows



In [0]:
from pyspark.sql.functions import lit 
trainDF.select("*",lit(1).alias("ConstantOne")).show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|           

In [0]:
## In Spark SQL - added constant value
spark.sql("SELECT *, 1 as ConstatnOne FROM trainDFTable LIMIT 3").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstatnOne|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|           

In [0]:
trainDF.show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              null|              null|    1422|
+---

In [0]:
trainDF = trainDF.withColumn("ConstantOne",lit(1))
trainDF.show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|           

In [0]:
trainDF.withColumn("OccupationOne",trainDF.Occupation + 1).show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+-------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|OccupationOne|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+-------------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|           11|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|           11|
|1000001| P00087842|     F| 0-17|  

In [0]:
tempDF = trainDF.withColumn('SameCategoryCode',
         trainDF['Product_Category_1'] == trainDF['Product_Category_2'])
tempDF.show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|SameCategoryCode|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|            null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|           false|
|1000001| P00087842|     

In [0]:
tempDF.dtypes

[('User_ID', 'int'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('ConstantOne', 'int'),
 ('SameCategoryCode', 'boolean')]

In [0]:
tempDF.filter(col('SameCategoryCode')== False).show(3)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|SameCategoryCode|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|           false|
|1000001| P00085442|     F| 0-17|        10|            A|                         2|             0|                12|                14|              null|    1057|          1|           false|
|1000003| P00193542|

In [0]:
tempDF.withColumnRenamed('SameCategoryCode','SimilarCategroyCode').show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+-------------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|SimilarCategroyCode|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+-------------------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|               null|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|              false|
|1000

In [0]:
tempDF.show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|SameCategoryCode|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|            null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|           false|
|1000001| P00087842|     

In [0]:
tempDF.drop('SameCategoryCode').show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|           

In [0]:
tempDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|SameCategoryCode|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+----------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|            null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|           false|
+-------+----------+-----

In [0]:
tempDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- ConstantOne: integer (nullable = false)
 |-- SameCategoryCode: boolean (nullable = true)



In [0]:
tempDF.withColumn("Purchase",col("Purchase").cast("String")).printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: string (nullable = true)
 |-- ConstantOne: integer (nullable = false)
 |-- SameCategoryCode: boolean (nullable = true)



In [0]:
trainDF.select('Product_ID').distinct().count()

3631

In [0]:
testDF = testDF.withColumn('Product_ID',col('ProductID'))

In [0]:
testDF.select('Product_ID').distinct().count()

3491

In [0]:
diff_cat_in_test_train = testDF.select('Product_ID').subtract(trainDF.select('Product_ID'))
diff_cat_in_test_train.count()

46

In [0]:
diff_cat_in_train_test = trainDF.select('Product_ID').subtract(testDF.select('Product_ID'))
diff_cat_in_train_test.count()

186

In [0]:
trainDF.crosstab('Age','Gender').show()

+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------+



In [0]:
trainDF.crosstab('Gender','Age').show()

+----------+-----+-----+------+-----+-----+-----+-----+
|Gender_Age| 0-17|18-25| 26-35|36-45|46-50|51-55|  55+|
+----------+-----+-----+------+-----+-----+-----+-----+
|         M|10019|75032|168835|82843|32502|28607|16421|
|         F| 5083|24628| 50752|27170|13199| 9894| 5083|
+----------+-----+-----+------+-----+-----+-----+-----+



In [0]:
trainDF.groupBy('Age','Gender').count().show()

+-----+------+------+
|  Age|Gender| count|
+-----+------+------+
|51-55|     F|  9894|
|18-25|     M| 75032|
| 0-17|     F|  5083|
|46-50|     M| 32502|
|18-25|     F| 24628|
|  55+|     M| 16421|
|  55+|     F|  5083|
|36-45|     M| 82843|
|26-35|     F| 50752|
| 0-17|     M| 10019|
|36-45|     F| 27170|
|51-55|     M| 28607|
|26-35|     M|168835|
|46-50|     F| 13199|
+-----+------+------+



In [0]:
trainDF.groupBy('Gender','Age').count().show()

+------+-----+------+
|Gender|  Age| count|
+------+-----+------+
|     F|46-50| 13199|
|     M| 0-17| 10019|
|     M|26-35|168835|
|     M|51-55| 28607|
|     M|18-25| 75032|
|     M|  55+| 16421|
|     F|51-55|  9894|
|     F|36-45| 27170|
|     F|18-25| 24628|
|     F|  55+|  5083|
|     M|36-45| 82843|
|     F| 0-17|  5083|
|     M|46-50| 32502|
|     F|26-35| 50752|
+------+-----+------+



In [0]:
spark.sql(""" SELECT Age,
      sum(case when Gender = 'F' then 1 else 0 end) Female,
      sum(case when Gender = 'M' then 1 else 0 end) Male
       from trainDFTable
       group by Age 
""").show()

+-----+------+------+
|  Age|Female|  Male|
+-----+------+------+
|18-25| 24628| 75032|
|26-35| 50752|168835|
| 0-17|  5083| 10019|
|46-50| 13199| 32502|
|51-55|  9894| 28607|
|36-45| 27170| 82843|
|  55+|  5083| 16421|
+-----+------+------+



In [0]:
trainDF.select('Age','Gender').show(20)

+-----+------+
|  Age|Gender|
+-----+------+
| 0-17|     F|
| 0-17|     F|
| 0-17|     F|
| 0-17|     F|
|  55+|     M|
|26-35|     M|
|46-50|     M|
|46-50|     M|
|46-50|     M|
|26-35|     M|
|26-35|     M|
|26-35|     M|
|26-35|     M|
|26-35|     M|
|51-55|     F|
|51-55|     F|
|51-55|     F|
|51-55|     F|
|36-45|     M|
|26-35|     M|
+-----+------+
only showing top 20 rows



In [0]:
trainDF.select('Age','Gender').dropDuplicates().show(20)

+-----+------+
|  Age|Gender|
+-----+------+
|51-55|     F|
|18-25|     M|
| 0-17|     F|
|46-50|     M|
|18-25|     F|
|  55+|     M|
|  55+|     F|
|36-45|     M|
|26-35|     F|
| 0-17|     M|
|36-45|     F|
|51-55|     M|
|26-35|     M|
|46-50|     F|
+-----+------+



In [0]:
trainDF.dropna().count()

166821

In [0]:
trainDF.na.drop('any').count()

166821

In [0]:
trainDF.na.drop().count()

166821

In [0]:
trainDF.fillna(-1).show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|          1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|           

In [0]:
fill_col_vals = {
    'Gender':'M',
    'Purchase':999999,
    'Product_Category_3': -1
}
trainDF.na.fill(fill_col_vals).show(3)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|                -1|    8370|          1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|           

In [0]:
trainDF.na.replace([""],["UNKNOWN"],"Gender").count()

550068

In [0]:
print('Purchase amount greater than 15000 {}'.format(trainDF.filter(trainDF.Purchase > 15000).count()))

Purchase amount greater than 15000 110523


In [0]:
print('Purchase amount greater than 15000 {}'.format(trainDF.filter(col('Purchase') > 15000).count()))

Purchase amount greater than 15000 110523


In [0]:
#print('Purchase amount greater than 15000 {}'.format(trainDF.filter(column('Purchase') > 15000).count()))

In [0]:
print('Purchase amount greater than 15000 {}'.format(trainDF.filter(expr('Purchase') > 15000).count()))

Purchase amount greater than 15000 110523


In [0]:
print('Purchase amount greater than 15000 {}'.format(trainDF.filter(trainDF["Purchase"] > 15000).count()))

Purchase amount greater than 15000 110523


In [0]:
spark.sql("""
    SELECT COUNT(*) AS Count 
    FROM trainDFTable WHERE Purchase > 15000
""").show(3)

+------+
| Count|
+------+
|110523|
+------+



In [0]:
trainDF.where((trainDF.Purchase > 15000) & (trainDF.Gender == 'F')).count()

21429

In [0]:
trainDF.where("Purchase > 15000").where("Gender = 'F'").count()

21429

In [0]:
trainDF.filter("Purchase > 15000").where("Gender = 'F'").count()

21429

In [0]:
trainDF.where((col("Purchase") > 15000) & (col("Gender") == 'F')).count()

21429

In [0]:
trainDF.filter((col("Purchase") > 15000) & (col("Gender") == 'F')).count()

21429

In [0]:
spark.sql("SELECT * FROM trainDFTable where Purchase > 15000 AND GENDER ='F'").count()

21429

In [0]:
from pyspark.sql.functions import countDistinct
trainDF.select(countDistinct("Age").alias("DISTINCT_Age")).show()

+------------+
|DISTINCT_Age|
+------------+
|           7|
+------------+



In [0]:
trainDF.agg(countDistinct("Age").alias("DISTINCT_Age")).show()

+------------+
|DISTINCT_Age|
+------------+
|           7|
+------------+



In [0]:
from pyspark.sql import functions as func 

In [0]:
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|ConstantOne|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|          1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+-----------

In [0]:
trainDF.agg(func.min('Age'),func.avg('Purchase')).show()

+--------+-----------------+
|min(Age)|    avg(Purchase)|
+--------+-----------------+
|    0-17|9263.968712959126|
+--------+-----------------+



In [0]:
trainDF.groupby('Age').count().show()

+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
| 0-17| 15102|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+



In [0]:
from pyspark.sql.functions import approx_count_distinct

In [0]:
?approx_count_distinct

In [0]:
trainDF.select(approx_count_distinct("Age",0.1)).show()

+--------------------------+
|approx_count_distinct(Age)|
+--------------------------+
|                         7|
+--------------------------+



In [0]:
from pyspark.sql.functions import first,last 
trainDF.select(first("Product_ID"),last("Product_ID")).show()

+-----------------+----------------+
|first(Product_ID)|last(Product_ID)|
+-----------------+----------------+
|        P00069042|       P00371644|
+-----------------+----------------+



In [0]:
from pyspark.sql.functions import min,max
trainDF.select(min("Purchase"),max("Purchase")).show()

+-------------+-------------+
|min(Purchase)|max(Purchase)|
+-------------+-------------+
|           12|        23961|
+-------------+-------------+



In [0]:
from pyspark.sql.functions import sum
trainDF.select(sum("Purchase")).show()

+-------------+
|sum(Purchase)|
+-------------+
|   5095812742|
+-------------+



In [0]:
from pyspark.sql.functions import sumDistinct
trainDF.select(sumDistinct("Purchase")).show()

+----------------------+
|sum(DISTINCT Purchase)|
+----------------------+
|             208520914|
+----------------------+



In [0]:
from pyspark.sql.functions import sum,count,avg,expr
trainDF.select(
     count("Purchase").alias("total_transactions"),
     sum("Purchase").alias("total_purchases"),
     avg("Purchase").alias("avg_purchases"),
     expr("mean(Purchase)").alias("mean_Purchases")
).selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_Purchases"
    ).show()

+--------------------------------------+-----------------+-----------------+
|(total_purchases / total_transactions)|    avg_purchases|   mean_Purchases|
+--------------------------------------+-----------------+-----------------+
|                     9263.968712959126|9263.968712959126|9263.968712959126|
+--------------------------------------+-----------------+-----------------+



In [0]:
from pyspark.sql.functions import var_pop,var_samp,stddev_pop,stddev_samp
trainDF.select(var_pop('Purchase'),var_samp('Purchase'),stddev_pop('Purchase'),stddev_samp('Purchase')).show()

+--------------------+-------------------+--------------------+---------------------+
|   var_pop(Purchase)| var_samp(Purchase)|stddev_pop(Purchase)|stddev_samp(Purchase)|
+--------------------+-------------------+--------------------+---------------------+
|2.5231140081385408E7|2.523118595059785E7|   5023.060827959921|    5023.065393820575|
+--------------------+-------------------+--------------------+---------------------+



In [0]:
from pyspark.sql.functions import skewness,kurtosis
trainDF.select(skewness("Purchase"),kurtosis("Purchase")).show()

+------------------+-------------------+
|skewness(Purchase)| kurtosis(Purchase)|
+------------------+-------------------+
|0.6001383671643392|-0.3383853975360327|
+------------------+-------------------+



In [0]:
spark.sql("SELECT skewness(Purchase), kurtosis(Purchase) FROM trainDFTable").show()

+----------------------------------+----------------------------------+
|skewness(CAST(Purchase AS DOUBLE))|kurtosis(CAST(Purchase AS DOUBLE))|
+----------------------------------+----------------------------------+
|                0.6001383671643392|               -0.3383853975360327|
+----------------------------------+----------------------------------+



In [0]:
from pyspark.sql.functions import corr, covar_pop,covar_samp
trainDF.select(corr("Purchase","Product_Category_1"),covar_samp("Purchase","Product_Category_1"),
               covar_pop("Purchase","Product_Category_1")).show()

+----------------------------------+----------------------------------------+---------------------------------------+
|corr(Purchase, Product_Category_1)|covar_samp(Purchase, Product_Category_1)|covar_pop(Purchase, Product_Category_1)|
+----------------------------------+----------------------------------------+---------------------------------------+
|               -0.3437033459199084|                      -6795.650007204535|                     -6795.637653004677|
+----------------------------------+----------------------------------------+---------------------------------------+



In [0]:
from pyspark.sql.functions import collect_list,collect_set
trainDF.agg(collect_set("Age"),collect_list("Age")).show()

+--------------------+--------------------+
|    collect_set(Age)|   collect_list(Age)|
+--------------------+--------------------+
|[55+, 51-55, 0-17...|[0-17, 0-17, 0-17...|
+--------------------+--------------------+



In [0]:
spark.sql("SELECT collect_set(Age), collect_list(Age) FROM trainDFTable").show() 

+--------------------+--------------------+
|    collect_set(Age)|   collect_list(Age)|
+--------------------+--------------------+
|[55+, 51-55, 0-17...|[0-17, 0-17, 0-17...|
+--------------------+--------------------+



In [0]:
trainDF.groupBy('Age').agg(count("Purchase").alias("quan")
                       ,expr("count(Purchase)")).show()

+-----+------+---------------+
|  Age|  quan|count(Purchase)|
+-----+------+---------------+
|18-25| 99660|          99660|
|26-35|219587|         219587|
| 0-17| 15102|          15102|
|46-50| 45701|          45701|
|51-55| 38501|          38501|
|36-45|110013|         110013|
|  55+| 21504|          21504|
+-----+------+---------------+



In [0]:
trainDF.groupBy('Age').agg(expr('Avg(Purchase)'),expr("stddev_pop(Purchase)")).show()

+-----+-----------------+--------------------+
|  Age|    Avg(Purchase)|stddev_pop(Purchase)|
+-----+-----------------+--------------------+
|18-25|9169.663606261289|  5034.2967396277945|
|26-35|9252.690632869888|   5010.515894010154|
| 0-17|8933.464640444974|   5110.944823427661|
|46-50|9208.625697468327|   4967.162022122706|
|51-55|9534.808030960236|   5087.302011173869|
|36-45|9331.350694917874|   5022.901050378538|
|  55+|9336.280459449405|    5011.37746955577|
+-----+-----------------+--------------------+



In [0]:
trainDF.groupBy('Age').agg({'Purchase':'mean'}).show()

+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|18-25|9169.663606261289|
|26-35|9252.690632869888|
| 0-17|8933.464640444974|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|36-45|9331.350694917874|
|  55+|9336.280459449405|
+-----+-----------------+



In [0]:
trainDF.groupBy('Age').agg({'Purchase':'sum'}).show()

+-----+-------------+
|  Age|sum(Purchase)|
+-----+-------------+
|18-25|    913848675|
|26-35|   2031770578|
| 0-17|    134913183|
|46-50|    420843403|
|51-55|    367099644|
|36-45|   1026569884|
|  55+|    200767375|
+-----+-------------+



In [0]:
exprs = {x : 'sum' for x in trainDF.columns}
trainDF.groupBy('Age').agg(exprs).show()

+-----+------------------+-----------------------+-------------------+-------------+------------+---------------+-------------------------------+-----------------------+--------+----------------+-----------+-----------------------+---------------+
|  Age|sum(City_Category)|sum(Product_Category_3)|sum(Marital_Status)|sum(Purchase)|sum(User_ID)|sum(Occupation)|sum(Stay_In_Current_City_Years)|sum(Product_Category_1)|sum(Age)|sum(ConstantOne)|sum(Gender)|sum(Product_Category_2)|sum(Product_ID)|
+-----+------------------+-----------------------+-------------------+-------------+------------+---------------+-------------------------------+-----------------------+--------+----------------+-----------+-----------------------+---------------+
|18-25|              null|                 388041|              21116|    913848675| 99939196632|         671348|                       116997.0|                 509371|    null|           99660|       null|                 654936|           null|
|26-35| 

In [0]:
###Joins
person = spark.createDataFrame(
    [(0,"Dr Dakshina Murthy",0,[250,100]),
     (1,"Dr Sridhar Pappu",1,[500,250,100]),
     (2,"Dr Manoj Duse",2,[100])  
    ]
).toDF("id","name","graduate_program","role_status")

In [0]:
person

DataFrame[id: bigint, name: string, graduate_program: bigint, role_status: array<bigint>]

In [0]:
person.show()

+---+------------------+----------------+---------------+
| id|              name|graduate_program|    role_status|
+---+------------------+----------------+---------------+
|  0|Dr Dakshina Murthy|               0|     [250, 100]|
|  1|  Dr Sridhar Pappu|               1|[500, 250, 100]|
|  2|     Dr Manoj Duse|               2|          [100]|
+---+------------------+----------------+---------------+



In [0]:
graduateProgram = spark.createDataFrame([
                   (0,"Ph.D","School of Information","Carnegie Mellon University"),
                   (1,"Ph.D","The University of Texas","El Paso"),
                   (2,"Ph.D","School Of Information","Oklhama University"),
                                          

]).toDF("id","degree","dept","school")

In [0]:
graduateProgram.show(truncate=False)

+---+------+-----------------------+--------------------------+
|id |degree|dept                   |school                    |
+---+------+-----------------------+--------------------------+
|0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------+-----------------------+--------------------------+



In [0]:
roleStatus = spark.createDataFrame([
               (500,"President"),
               (250,"Founder"),
               (100,"Mentor")                                   
]).toDF("id","status")

In [0]:
roleStatus.show()

+---+---------+
| id|   status|
+---+---------+
|500|President|
|250|  Founder|
|100|   Mentor|
+---+---------+



In [0]:
person.createOrReplaceTempView("personTbl")
graduateProgram.createOrReplaceTempView("graduateProgramTbl")
roleStatus.createOrReplaceTempView("roleStatusTbl")

In [0]:
joinExpression = person["graduate_program"] == graduateProgram["id"]
person.join(graduateProgram,joinExpression).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
spark.sql("""SELECT * FROM personTbl JOIN graduateProgramTbl 
ON personTbl.graduate_program = graduateProgramTbl.id """).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
joinType = "inner"
person.join(graduateProgram,joinExpression,joinType).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
joinType = "outer"
person.join(graduateProgram,joinExpression,joinType).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
spark.sql("""SELECT * FROM personTbl INNER JOIN graduateProgramTbl 
ON personTbl.graduate_program = graduateProgramTbl.id """).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
joinType = "left_outer"
person.join(graduateProgram,joinExpression,joinType).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
joinType = "right_outer"
person.join(graduateProgram,joinExpression,joinType).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
spark.sql("""SELECT * FROM personTbl LEFT OUTER JOIN graduateProgramTbl 
ON personTbl.graduate_program = graduateProgramTbl.id """).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
spark.sql("""SELECT * FROM personTbl RIGHT OUTER JOIN graduateProgramTbl 
ON personTbl.graduate_program = graduateProgramTbl.id """).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
joinType = "cross"
person.join(graduateProgram,joinExpression,joinType).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
spark.sql("""SELECT * FROM personTbl CROSS JOIN graduateProgramTbl 
ON personTbl.graduate_program = graduateProgramTbl.id """).show(truncate=False)

+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|id |name              |graduate_program|role_status    |id |degree|dept                   |school                    |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+
|0  |Dr Dakshina Murthy|0               |[250, 100]     |0  |Ph.D  |School of Information  |Carnegie Mellon University|
|1  |Dr Sridhar Pappu  |1               |[500, 250, 100]|1  |Ph.D  |The University of Texas|El Paso                   |
|2  |Dr Manoj Duse     |2               |[100]          |2  |Ph.D  |School Of Information  |Oklhama University        |
+---+------------------+----------------+---------------+---+------+-----------------------+--------------------------+



In [0]:
from pyspark.sql.functions import expr
person.withColumnRenamed("id","personId").join(roleStatus,expr("array_contains(role_status,id)")).show()

+--------+------------------+----------------+---------------+---+---------+
|personId|              name|graduate_program|    role_status| id|   status|
+--------+------------------+----------------+---------------+---+---------+
|       0|Dr Dakshina Murthy|               0|     [250, 100]|250|  Founder|
|       0|Dr Dakshina Murthy|               0|     [250, 100]|100|   Mentor|
|       1|  Dr Sridhar Pappu|               1|[500, 250, 100]|500|President|
|       1|  Dr Sridhar Pappu|               1|[500, 250, 100]|250|  Founder|
|       1|  Dr Sridhar Pappu|               1|[500, 250, 100]|100|   Mentor|
|       2|     Dr Manoj Duse|               2|          [100]|100|   Mentor|
+--------+------------------+----------------+---------------+---+---------+



In [0]:
spark.sql(""" SELECT * FROM 
    (select id as personId, name,graduate_program,role_status FROM personTbl)
    INNER JOIN roleStatusTbl ON array_contains(role_status,id)
""").show()

+--------+------------------+----------------+---------------+---+---------+
|personId|              name|graduate_program|    role_status| id|   status|
+--------+------------------+----------------+---------------+---+---------+
|       0|Dr Dakshina Murthy|               0|     [250, 100]|250|  Founder|
|       0|Dr Dakshina Murthy|               0|     [250, 100]|100|   Mentor|
|       1|  Dr Sridhar Pappu|               1|[500, 250, 100]|500|President|
|       1|  Dr Sridhar Pappu|               1|[500, 250, 100]|250|  Founder|
|       1|  Dr Sridhar Pappu|               1|[500, 250, 100]|100|   Mentor|
|       2|     Dr Manoj Duse|               2|          [100]|100|   Mentor|
+--------+------------------+----------------+---------------+---+---------+



In [0]:
trainDF.count()

550068

In [0]:
sample1 = trainDF.sample(False,0.1,1234)
sample2 = trainDF.sample(False,0.1,2345)

In [0]:
sample1.count()

55488

In [0]:
sample2.count()

54712

In [0]:
splitDF = trainDF.randomSplit([0.7,0.3],seed=8787)

In [0]:
type(splitDF)

list

In [0]:
splitDF

[DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int, ConstantOne: int],
 DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int, ConstantOne: int]]

In [0]:
print(splitDF[0].count())
print(splitDF[1].count())

385109
164959


In [0]:
trainDF.select('User_ID').rdd.map(lambda x : (x,1)).take(5)

[(Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000002), 1)]

In [0]:
trainDF.orderBy(trainDF.Purchase.desc()).show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1002272| P00052842|     M|26-35|         0|            C|                         1|             0|                10|                15|              null|   23961|
|1003160| P00052842|     M|26-35|        17|            C|                         3|             0|                10|                15|              null|   23961|
|1001474| P00052842|     M|26-35|         4|            A|                         2|             1|                10|                15|              null|   23961

In [0]:
trainDF.rdd.getNumPartitions()

2

In [0]:
trainDF.repartition(10).rdd.getNumPartitions()

10

In [0]:
trainDF.repartition(5,col('Purchase')).rdd.getNumPartitions()

5

In [0]:
trainDF.repartition(5,col('Purchase')).coalesce(2).rdd.getNumPartitions()   

2

In [0]:
trainDF.withColumn("Purchase_new",trainDF.Purchase/2.0).select('Purchase','Purchase_new').show()

+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
|    8370|      4185.0|
|   15200|      7600.0|
|    1422|       711.0|
|    1057|       528.5|
|    7969|      3984.5|
|   15227|      7613.5|
|   19215|      9607.5|
|   15854|      7927.0|
|   15686|      7843.0|
|    7871|      3935.5|
|    5254|      2627.0|
|    3957|      1978.5|
|    6073|      3036.5|
|   15665|      7832.5|
|    5378|      2689.0|
|    2079|      1039.5|
|   13055|      6527.5|
|    8851|      4425.5|
|   11788|      5894.0|
|   19614|      9807.0|
+--------+------------+
only showing top 20 rows



In [0]:
diff_cat_in_train_test

DataFrame[Product_ID: string]

In [0]:
diff_cat_in_train_test.count()

186

In [0]:
diff_cat_in_test_train

DataFrame[Product_ID: string]

In [0]:
diff_cat_in_test_train.count()

46

In [0]:
diff_cat_in_test_train.show(2)

+----------+
|Product_ID|
+----------+
| P00322642|
| P00300142|
+----------+
only showing top 2 rows



In [0]:
not_count_cat = diff_cat_in_test_train.rdd.map(lambda x : x[0]).collect()

In [0]:
not_count_cat

['P00322642',
 'P00300142',
 'P00077642',
 'P00249942',
 'P00294942',
 'P00106242',
 'P00239542',
 'P00074942',
 'P00092742',
 'P00082142',
 'P00030342',
 'P00062542',
 'P00063942',
 'P00013042',
 'P00279042',
 'P00227242',
 'P00359842',
 'P00061642',
 'P00042642',
 'P0099542',
 'P00306842',
 'P00140842',
 'P00165542',
 'P00322842',
 'P00268942',
 'P00236842',
 'P00038942',
 'P00172942',
 'P00012642',
 'P00270342',
 'P00312642',
 'P00336842',
 'P00105742',
 'P00309842',
 'P00166542',
 'P00082642',
 'P00253842',
 'P00062242',
 'P00100242',
 'P00315342',
 'P00058842',
 'P00168242',
 'P00156942',
 'P00039042',
 'P00056942',
 'P00204642']

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
Function1 = udf(lambda x : '-1' if x in not_count_cat else x, StringType())

In [0]:
k = testDF.withColumn("NewProductId",Function1(testDF["ProductId"])).select("NewProductId")

In [0]:
k.where(k["NewProductId"] == -1).show(3)

+------------+
|NewProductId|
+------------+
|          -1|
|          -1|
|          -1|
+------------+
only showing top 3 rows



In [0]:
from pyspark.sql.functions import lit,round,bround
trainDF.select(round(lit("2.5")),bround(lit(2.5))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [0]:
spark.sql("SELECT round(2.9), round(2.5), bround(2.4),bround(2.9)").show(3)

+-------------+-------------+--------------+--------------+
|round(2.9, 0)|round(2.5, 0)|bround(2.4, 0)|bround(2.9, 0)|
+-------------+-------------+--------------+--------------+
|            3|            3|             2|             3|
+-------------+-------------+--------------+--------------+



In [0]:
spark.sql("SELECT round(2.5), bround(2.5)").show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|            3|             2|
+-------------+--------------+



In [0]:
### String manipulations

In [0]:
from pyspark.sql.functions import lit,ltrim,rtrim,lpad,rpad,trim
trainDF.select(
   ltrim(lit(" HELLO ")).alias("ltrim"),
   rtrim(lit(" HELLO ")).alias("rtrim"),
   trim(lit(" HELLO ")).alias("trim"),
   lpad(lit("HELLO"),7," ").alias("lpad"),
   rpad(lit("HELLO"),7, " ").alias("rpad") 
).show(2)

+------+------+-----+-------+-------+
| ltrim| rtrim| trim|   lpad|   rpad|
+------+------+-----+-------+-------+
|HELLO | HELLO|HELLO|  HELLO|HELLO  |
|HELLO | HELLO|HELLO|  HELLO|HELLO  |
+------+------+-----+-------+-------+
only showing top 2 rows



In [0]:
spark.sql("""SELECT
ltrim('  HELLLOO   '),
rtrim('  HELLLOO   '),
trim('  HELLLOO   '),
lpad('HELLO',3,' '),
rpad('HELLO ',10,' ')
FROM trainDFTable
""").show(3)

+-------------------+-------------------+------------------+-----------------+-------------------+
|ltrim(  HELLLOO   )|rtrim(  HELLLOO   )|trim(  HELLLOO   )|lpad(HELLO, 3,  )|rpad(HELLO , 10,  )|
+-------------------+-------------------+------------------+-----------------+-------------------+
|         HELLLOO   |            HELLLOO|           HELLLOO|              HEL|         HELLO     |
|         HELLLOO   |            HELLLOO|           HELLLOO|              HEL|         HELLO     |
|         HELLLOO   |            HELLLOO|           HELLLOO|              HEL|         HELLO     |
+-------------------+-------------------+------------------+-----------------+-------------------+
only showing top 3 rows



In [0]:
from pyspark.sql.functions import regexp_replace
regex_string = "F|M"

trainDF.select(
regexp_replace(col("Gender"),regex_string,"MALE_OR_FEMALE")
.alias("GENDER_DECODE"),
col("GENDER")   

).show(5)

+--------------+------+
| GENDER_DECODE|GENDER|
+--------------+------+
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     M|
+--------------+------+
only showing top 5 rows



In [0]:
spark.sql("""
SELECT 
regexp_replace(Gender,'F|M', 'MALE_OR_FEMALE') AS DECODE_GENDER, GENDER FROM trainDFTable
""").show(4)

+--------------+------+
| DECODE_GENDER|GENDER|
+--------------+------+
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
+--------------+------+
only showing top 4 rows



In [0]:
from pyspark.sql.functions import translate
trainDF.select(
    translate(col("Gender"),"FM","01"),
    col("Gender")
).show(10)

+-------------------------+------+
|translate(Gender, FM, 01)|Gender|
+-------------------------+------+
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
+-------------------------+------+
only showing top 10 rows



In [0]:
spark.sql("""
SELECT 
translate(Gender,'FM','01'),
Gender
FROM 
trainDFTable
""").show(10)

+-------------------------+------+
|translate(Gender, FM, 01)|Gender|
+-------------------------+------+
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
+-------------------------+------+
only showing top 10 rows



In [0]:
from pyspark.sql.functions import current_date, current_timestamp
df_date = spark.range(10).withColumn("today",current_date()).withColumn("now",current_timestamp())
df_date.show(3)

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2021-07-16|2021-07-16 12:54:...|
|  1|2021-07-16|2021-07-16 12:54:...|
|  2|2021-07-16|2021-07-16 12:54:...|
+---+----------+--------------------+
only showing top 3 rows



In [0]:
df_date.createOrReplaceTempView("dataDFTable")

In [0]:
df_date.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [0]:
from pyspark.sql.functions import date_add,date_sub
df_date.select(date_sub(col("today"),5),date_add(col("today"),5)).show(5)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
+------------------+------------------+
only showing top 5 rows



In [0]:
spark.sql("""
SELECT 
date_sub(today,5),
date_add(today,5)
FROM 
dataDFTable
""").show()

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
|        2021-07-11|        2021-07-21|
+------------------+------------------+



In [0]:
from pyspark.sql.functions import datediff,months_between,to_date
df_date.withColumn("week_ago",date_sub(col("today"),7))\
.select(datediff(col("week_ago"),col("today")))\
.show(4)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
|                       -7|
|                       -7|
|                       -7|
+-------------------------+
only showing top 4 rows



In [0]:
from pyspark.sql.functions import lit 
df_date.select(
    to_date(lit("2019-02-20")).alias("start"),
    to_date(lit("2020-11-10")).alias("end"))\
    .select(months_between(col("start"),col("end"))).show(5)


+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -20.67741935|
|                    -20.67741935|
|                    -20.67741935|
|                    -20.67741935|
|                    -20.67741935|
+--------------------------------+
only showing top 5 rows



In [0]:
spark.sql("""
   SELECT 
   to_date('2019-02-20'),
   months_between('2019-02-20','2020-11-10'),
   datediff('2020-02-20','2020-11-10')
   FROM 
   dataDFTable
""").show(5)

+-------------------+----------------------------------------------------------------------------------+------------------------------------------------------------+
|to_date(2019-02-20)|months_between(CAST(2019-02-20 AS TIMESTAMP), CAST(2020-11-10 AS TIMESTAMP), true)|datediff(CAST(2020-02-20 AS DATE), CAST(2020-11-10 AS DATE))|
+-------------------+----------------------------------------------------------------------------------+------------------------------------------------------------+
|         2019-02-20|                                                                      -20.67741935|                                                        -264|
|         2019-02-20|                                                                      -20.67741935|                                                        -264|
|         2019-02-20|                                                                      -20.67741935|                                                        -264|
|   

In [0]:
df_date.select(to_date(lit('2020-20-11')),to_date(lit('2020-11-20'))).show(2)
### year-month-day to year-day-month

+---------------------+---------------------+
|to_date('2020-20-11')|to_date('2020-11-20')|
+---------------------+---------------------+
|                 null|           2020-11-20|
|                 null|           2020-11-20|
+---------------------+---------------------+
only showing top 2 rows



In [0]:
yyyy-MM-dd -> yyyy-dd-MM

In [0]:
from pyspark.sql.functions import unix_timestamp, from_unixtime
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(to_date(unix_timestamp(lit("2020-12-11"),dateFormat).cast("timestamp"))\
.alias("date"),
to_date(unix_timestamp(lit('2020-20-11'),dateFormat).cast("timestamp")).alias("date_2")
)

In [0]:
cleanDateDF.show()

+----------+----------+
|      date|    date_2|
+----------+----------+
|2020-11-12|2020-11-20|
+----------+----------+



In [0]:
cleanDateDF.printSchema()

root
 |-- date: date (nullable = true)
 |-- date_2: date (nullable = true)



In [0]:
from pyspark.sql.functions import month,year, dayofweek
cleanDF = cleanDateDF.withColumn("month_col",month("date_2")).withColumn("year_col",year("date_2")).withColumn("dayOftheWeek",dayofweek("date_2"))

In [0]:
cleanDF.show()

+----------+----------+---------+--------+------------+
|      date|    date_2|month_col|year_col|dayOftheWeek|
+----------+----------+---------+--------+------------+
|2020-11-12|2020-11-20|       11|    2020|           6|
+----------+----------+---------+--------+------------+



In [0]:
cleanDF.createOrReplaceTempView("dateTable2")
spark.sql("""
 SELECT 
  to_date(cast(unix_timestamp(date_2,'yyyy-dd-MM') as timestamp)),
  to_date(date)
  FROM 
  dateTable2
""").show()

+--------------------------------------------------------------+-------------+
|to_date(CAST(unix_timestamp(date_2, yyyy-dd-MM) AS TIMESTAMP))|to_date(date)|
+--------------------------------------------------------------+-------------+
|                                                    2020-11-20|   2020-11-12|
+--------------------------------------------------------------+-------------+



In [0]:
textDF = spark.range(10).withColumn("Description",lit("We have a long sentence to be broken"))

In [0]:
textDF.show(truncate=False)

+---+------------------------------------+
|id |Description                         |
+---+------------------------------------+
|0  |We have a long sentence to be broken|
|1  |We have a long sentence to be broken|
|2  |We have a long sentence to be broken|
|3  |We have a long sentence to be broken|
|4  |We have a long sentence to be broken|
|5  |We have a long sentence to be broken|
|6  |We have a long sentence to be broken|
|7  |We have a long sentence to be broken|
|8  |We have a long sentence to be broken|
|9  |We have a long sentence to be broken|
+---+------------------------------------+



In [0]:
from pyspark.sql.functions import split
textDF.select(split(col("Description")," ")).show()

+---------------------+
|split(Description,  )|
+---------------------+
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
+---------------------+



In [0]:
textDF.createOrReplaceTempView("textDFTable")

In [0]:
spark.sql("""
SELECT 
split(Description,' ')
FROM 
textDFTable
""").show()

+---------------------+
|split(Description,  )|
+---------------------+
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
| [We, have, a, lon...|
+---------------------+



In [0]:
my_collection = "Postgraduate program in Data Science and Optimization".split(" ")
words = spark.sparkContext.parallelize(my_collection)

In [0]:
words.take(3)

['Postgraduate', 'program', 'in']

In [0]:
supplementData = {"Postgraduate":100,"program":250,"Data":225,"Optimization":-100,"Science":1220}


In [0]:
suppleBroadcast = spark.sparkContext.broadcast(supplementData)

In [0]:
suppleBroadcast.value

{'Postgraduate': 100,
 'program': 250,
 'Data': 225,
 'Optimization': -100,
 'Science': 1220}

In [0]:
words.map(lambda x : (x,suppleBroadcast.value.get(x,0))).sortBy(lambda wordPair: wordPair[1]).collect()

[('Optimization', -100),
 ('in', 0),
 ('and', 0),
 ('Postgraduate', 100),
 ('Data', 225),
 ('program', 250),
 ('Science', 1220)]

In [0]:
spark

In [0]:
cwgDF = spark.read.format('csv').option('header','true').option('inferSchema','true').load("./Data/XXI_Commonwealth_Games.csv")

In [0]:
cwgDF.show(2)

+---+----------+----------+----+------+------+-----+
|Seq|NationCode|NationName|Gold|Silver|Bronze|Total|
+---+----------+----------+----+------+------+-----+
|  1|       AUS| Australia|  60|    45|    46|  151|
|  2|       ENG|   England|  28|    31|    24|   83|
+---+----------+----------+----+------+------+-----+
only showing top 2 rows



In [0]:
cwgDF.printSchema()

root
 |-- Seq: integer (nullable = true)
 |-- NationCode: string (nullable = true)
 |-- NationName: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)



In [0]:
cwgDF.show(5)

+---+----------+------------+----+------+------+-----+
|Seq|NationCode|  NationName|Gold|Silver|Bronze|Total|
+---+----------+------------+----+------+------+-----+
|  1|       AUS|   Australia|  60|    45|    46|  151|
|  2|       ENG|     England|  28|    31|    24|   83|
|  3|       IND|       India|  14|     6|     9|   29|
|  4|       CAN|      Canada|  11|    26|    19|   56|
|  5|       RSA|South Africa|  11|     9|    12|   32|
+---+----------+------------+----+------+------+-----+
only showing top 5 rows



In [0]:
accInd = spark.sparkContext.accumulator(0)

In [0]:
accInd.value

0

In [0]:
def accIndFunc(each_row):
  countryCD = each_row["NationCode"]
  list_ctrys = ['IND','PAK','SRI','BAN']
  if countryCD in list_ctrys:
    accInd.add(each_row['Total'])

In [0]:
cwgDF.foreach(lambda each_row:accIndFunc(each_row))

In [0]:
accInd.value

38

In [0]:
tcs_bo_df = spark.read.format('csv')\
.option('header','true')\
.option('inferSchema','true')\
.load("./Data/TCS_BO.csv")

In [0]:
tcs_bo_df.show(5)

+----------+---------+---------+---------+---------+---------+------+
|      Date|     Open|     High|      Low|    Close|Adj Close|Volume|
+----------+---------+---------+---------+---------+---------+------+
|2002-01-14|38.500000|39.500000|38.062500|38.400002|20.948002| 83688|
|2002-01-15|38.112499|38.724998|37.150002|37.412498|20.409311| 47496|
|2002-01-16|38.049999|38.500000|37.125000|37.700001|20.566145| 51624|
|2002-01-17|36.250000|38.750000|36.250000|38.337502|20.913918| 85840|
|2002-01-18|38.750000|39.974998|38.150002|38.549999|21.029835| 78928|
+----------+---------+---------+---------+---------+---------+------+
only showing top 5 rows



In [0]:
tcs_bo_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [0]:
tcs_bo_df = tcs_bo_df.select(col('Date').cast('date'),
                             col('Open').cast('double'),
                             col('High').cast('double'),
                             col('Low').cast('double'),
                             col('Close').cast('double'),
                             col('Adj Close').cast('double'),
                             col('Volume').cast("int"))

In [0]:
tcs_bo_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [0]:
tcs_bo_df = tcs_bo_df.withColumnRenamed("Adj Close","AdjClose")
tcs_bo_df = tcs_bo_df.withColumnRenamed("date","Stock_Date")

In [0]:
tcs_bo_df.write.format('json').mode('overwrite').save('./Data/tcs_bo.json')

In [0]:
tcs_bo_df.write.format('parquet').mode('overwrite').save('./Data/tcs_bo.parquet')

In [0]:
tcs_bo_json_df = spark.read.format("json")\
.option('inferSchema','true')\
.load("./Data/tcs_bo.json")

In [0]:
tcs_bo_json_df.show(5)

+---------+---------+---------+---------+---------+----------+------+
| AdjClose|    Close|     High|      Low|     Open|Stock_Date|Volume|
+---------+---------+---------+---------+---------+----------+------+
|20.948002|38.400002|     39.5|  38.0625|     38.5|2002-01-14| 83688|
|20.409311|37.412498|38.724998|37.150002|38.112499|2002-01-15| 47496|
|20.566145|37.700001|     38.5|   37.125|38.049999|2002-01-16| 51624|
|20.913918|38.337502|    38.75|    36.25|    36.25|2002-01-17| 85840|
|21.029835|38.549999|39.974998|38.150002|    38.75|2002-01-18| 78928|
+---------+---------+---------+---------+---------+----------+------+
only showing top 5 rows



### Connect to mysql database

In [0]:
userDetails = {'userName':'fai1001','pwd':'csf&231@kj'}
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars", "/home/jayantm/Spark/SparkSQL/mysql-connector-java-8.0.22.jar") \
    .master("local").appName("PySpark_MySQL_test").getOrCreate()

products_df = spark.read.format("jdbc").option("url", "jdbc:mysql://125.99.159.93:6033/insofe_productsDB") \
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "products") \
    .option("user", userDetails['userName']).option("password", userDetails['pwd']).load()

In [0]:
products_df.show(3)

+---------+-----------+---------+--------+-----+
|productID|productCode|     name|quantity|price|
+---------+-----------+---------+--------+-----+
|     1001|        PEN|  Pen Red|    4950| 1.23|
|     1002|        PEN| Pen Blue|    8000| 1.13|
|     1003|        PEN|Pen Black|    2000| 1.13|
+---------+-----------+---------+--------+-----+
only showing top 3 rows

