<a href="https://colab.research.google.com/github/pksX01/PySpark_Tutorials/blob/main/PySpark_Advanced.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**PySpark Setup**

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 55.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=58a87e217460e1a74c39d7b29703581ffc711b241b41fd8928fafcf4ab8964a2
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


#**Spark Session**

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("PySpark DataFrame Practice")\
        .config('spark.ui.port', '4050')\
        .enableHiveSupport()\
        .getOrCreate()

#Spark SQL

In [None]:
spark.sql("create table tbl1 (id string)")

DataFrame[]

In [None]:
spark.sql("create table tbl2 (id string)")

DataFrame[]

In [None]:
#spark.sql("truncate table tbl1")

DataFrame[]

In [None]:
spark.sql("insert into tbl1 values (1)")
spark.sql("insert into tbl1 values (1)")
spark.sql("insert into tbl1 values (2)")
spark.sql("insert into tbl1 values (3)")
spark.sql("insert into tbl1 values (NULL)")
spark.sql("insert into tbl1 values ('')")

DataFrame[]

In [None]:
spark.sql("insert into tbl1 values ('')")

DataFrame[]

In [None]:
spark.sql("insert into tbl2 values (1)")
spark.sql("insert into tbl2 values (1)")
spark.sql("insert into tbl2 values (2)")
spark.sql("insert into tbl2 values (3)")
spark.sql("insert into tbl2 values (NULL)")
spark.sql("insert into tbl2 values ('')")

DataFrame[]

In [None]:
spark.sql("select * from tbl1").show()

+----+
|  id|
+----+
|   1|
|   3|
|   2|
|    |
|null|
|   1|
+----+



In [None]:
spark.sql("select * from tbl2").show()

+----+
|  id|
+----+
|   3|
|   1|
|   2|
|   1|
|    |
|null|
+----+



In [None]:
spark.sql("select * from tbl1 join tbl2 on tbl1.id = tbl2.id order by tbl1.id").show()

+---+---+
| id| id|
+---+---+
|   |   |
|  1|  1|
|  1|  1|
|  1|  1|
|  1|  1|
|  2|  2|
|  3|  3|
+---+---+



In [None]:
spark.sql("select * from tbl1 left join tbl2 on tbl1.id = tbl2.id order by tbl1.id").show()

+----+----+
|  id|  id|
+----+----+
|null|null|
|    |    |
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   2|   2|
|   3|   3|
+----+----+



In [None]:
spark.sql("select * from tbl1 right join tbl2 on tbl1.id = tbl2.id order by tbl1.id").show()

+----+----+
|  id|  id|
+----+----+
|null|null|
|    |    |
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   2|   2|
|   3|   3|
+----+----+



In [None]:
spark.sql("select * from tbl1 full outer join tbl2 on tbl1.id = tbl2.id order by tbl1.id").show()

+----+----+
|  id|  id|
+----+----+
|null|null|
|null|null|
|    |    |
|   1|   1|
|   1|   1|
|   1|   1|
|   1|   1|
|   2|   2|
|   3|   3|
+----+----+



In [None]:
#catalog is used to list all the data inside the cluster
spark.catalog.listTables() #displays the list of tables inside the cluster

[Table(name='tbl1', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='tbl2', database='default', description=None, tableType='MANAGED', isTemporary=False)]

#**DataFrame**

##Creating DataFrame from existing table in Catalog

In [None]:
df_catalog = spark.table("tbl_name")

##Empty RDD and DataFrame

###Creating Empty RDD using emptyRDD()

In [None]:
empty_rdd = spark.sparkContext.emptyRDD()
empty_rdd.collect()

[]

###Creating empty RDD using parallelize() 

In [None]:
empty_rdd_1 = spark.sparkContext.parallelize([])
empty_rdd_1.collect()

[]

###Creating Empty DataFrame from empty rdd with schema using Struct Type

In [None]:
schema = StructType([
                     StructField('Name', StringType(), True),
                     StructField('Age', IntegerType(), True)
])

In [None]:
empty_df_using_struct_type = spark.createDataFrame(empty_rdd, schema)
empty_df_using_struct_type.show()

+----+---+
|Name|Age|
+----+---+
+----+---+



###Converting empty RDD to DataFrame with schema 

In [None]:
converted_empty_df_with_schema = empty_rdd.toDF(schema)
converted_empty_df_with_schema.show()

+----+---+
|Name|Age|
+----+---+
+----+---+



###Creating empty DataFrame with schema without using RDD

In [None]:
new_empty_df_with_schema = spark.createDataFrame([], schema)
new_empty_df_with_schema.show()

+----+---+
|Name|Age|
+----+---+
+----+---+



###Creating empty DataFrame without schema and without using RDD

In [None]:
#StructType([]) is required to create empty schema because creating empty DataFrame using createDataFrame requires schema even though 
#schema is empty
new_empty_df_without_schema = spark.createDataFrame([], StructType([]))
new_empty_df_without_schema.show()

++
||
++
++



##DataFrame having Data

Spark DataFrame can be crated in multiple ways: <br>
1. From RDD <br>
  (a) [using toDF() method](https://colab.research.google.com/drive/1fs0P0MSBtGVZSe7odCa_EyZVy-wNv7wh#scrollTo=0vEkEby03TsJ) <br>
  (b) [using Row object and toDF() method ](https://colab.research.google.com/drive/1fs0P0MSBtGVZSe7odCa_EyZVy-wNv7wh#scrollTo=xxlxvUhw3Tsf)<br>
  (c) using SparkSession.createDataFrame() method<br>

2. From List <br>
  (a) [using SparkSession.createDataFrame() method](https://colab.research.google.com/drive/1fs0P0MSBtGVZSe7odCa_EyZVy-wNv7wh#scrollTo=A3NpL2KB3TtC&line=3&uniqifier=1) <br>
  (b) using StructField and StructType <br>

3. From Row object using createDataFrame() <br>

4. From different type of files such as csv, text, json etc. using read() method

5. [From Pandas DataFrame](https://colab.research.google.com/drive/1fs0P0MSBtGVZSe7odCa_EyZVy-wNv7wh#scrollTo=4ACfCbkV3Ttd&line=1&uniqifier=1)


### 2(b) From List using StructField and StructType

In [10]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [11]:
avengers_data = [
    ('Iron Man', 'Tony Stark', 1),
    ('Captain America', 'Steve Rogers', 1),
    ('Thor', 'Thor', 1),
    ('Hulk', 'Bruce Banner', 1),
    ('Black Widow', 'Natasha Romonoff', 2),
    ('Hawkayee', 'Clint Berton', 2),
    ('Winter Soldier', 'Bucky', 2),
    ('Spider Man', 'Peter Parker', 3),
    ('Ant Man', 'Scot Lang', 3),
    ('Falcon', 'Sam Wilson', 3),
    ('Vision', 'Vision', 3),
    ('Scarlet Witch', 'Wanda Maximoff', 3)
]

In [12]:
avengers_schema = StructType([
                              StructField('Avengers\' Name', StringType(), True),
                              StructField('Real Name', StringType(), True),
                              StructField('Generation', IntegerType(), True)
])

In [13]:
avengers_df = spark.createDataFrame(data = avengers_data, schema = avengers_schema)
avengers_df.show()

+---------------+----------------+----------+
| Avengers' Name|       Real Name|Generation|
+---------------+----------------+----------+
|       Iron Man|      Tony Stark|         1|
|Captain America|    Steve Rogers|         1|
|           Thor|            Thor|         1|
|           Hulk|    Bruce Banner|         1|
|    Black Widow|Natasha Romonoff|         2|
|       Hawkayee|    Clint Berton|         2|
| Winter Soldier|           Bucky|         2|
|     Spider Man|    Peter Parker|         3|
|        Ant Man|       Scot Lang|         3|
|         Falcon|      Sam Wilson|         3|
|         Vision|          Vision|         3|
|  Scarlet Witch|  Wanda Maximoff|         3|
+---------------+----------------+----------+



Nested StructType and ArrayType and MapType inside StructType

In [14]:
from pyspark.sql.types import StructType, StructField, ArrayType, MapType, StringType, IntegerType
structSchema = StructType([
                           StructField("Name", StructType([
                                                           StructField("First Name", StringType(), True),
                                                           StructField("Middle Name", StringType(), True),
                                                           StructField("Last Name", StringType(), True)
                           ])),
                           StructField("Marks", ArrayType(IntegerType()), True),
                           StructField("Passing Status", MapType(StringType(), StringType()), True)
])

In [15]:
nestedData = [
              (("Pavan", "Kumar", "Singh"), [76, 85, 92, 95, 90], {"Passed" : "Yes"}),
              (("Unknown", "", ""), [32, 50, 67, 21, 72], {"Passed" : "No"})
]

In [16]:
nestedStructDF = spark.createDataFrame(data = nestedData, schema = structSchema)

In [17]:
nestedStructDF.printSchema()

root
 |-- Name: struct (nullable = true)
 |    |-- First Name: string (nullable = true)
 |    |-- Middle Name: string (nullable = true)
 |    |-- Last Name: string (nullable = true)
 |-- Marks: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- Passing Status: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [18]:
nestedStructDF.show()

+--------------------+--------------------+---------------+
|                Name|               Marks| Passing Status|
+--------------------+--------------------+---------------+
|{Pavan, Kumar, Si...|[76, 85, 92, 95, 90]|{Passed -> Yes}|
|       {Unknown, , }|[32, 50, 67, 21, 72]| {Passed -> No}|
+--------------------+--------------------+---------------+



In [34]:
nestedStructDF.select('name.First name').show()

+----------+
|First name|
+----------+
|     Pavan|
|   Unknown|
+----------+



Adding and changing the struct of the DataFrame

In [23]:
def passingCheck(marks):
  for i in marks:
    if i < 33:
      return False
  return True

In [24]:
def combineStruct(struct):
  combinedStruct = ''
  for ele in struct:
    combinedStruct += ele + ' '
  return combinedStruct

In [26]:
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import BooleanType

sum_udf = udf(lambda x : sum(x))
passingCheckUdf = udf(lambda x : passingCheck(x), BooleanType())
combinedStructUdf = udf(lambda x : combineStruct(x))

updatedStructDF = nestedStructDF.withColumn("Total Marks", sum_udf(col('Marks')))\
                                .withColumn("Result", when(passingCheckUdf(col("Marks")), "Passed").otherwise("Failed"))\
                                .withColumn("Name", combinedStructUdf(col("Name")))\
                                .drop("Passing Status")
updatedStructDF.show()

+------------------+--------------------+-----------+------+
|              Name|               Marks|Total Marks|Result|
+------------------+--------------------+-----------+------+
|Pavan Kumar Singh |[76, 85, 92, 95, 90]|        438|Passed|
|        Unknown   |[32, 50, 67, 21, 72]|        242|Failed|
+------------------+--------------------+-----------+------+



###4. From different type of files such as csv, text, json etc. using read() method

In [None]:
from google.colab import files
files.upload()

Saving healthcare-dataset-stroke-data.csv to healthcare-dataset-stroke-data.csv


{'healthcare-dataset-stroke-data.csv': b'id,gender,age,hypertension,heart_disease,ever_married,work_type,Residence_type,avg_glucose_level,bmi,smoking_status,stroke\n9046,Male,67,0,1,Yes,Private,Urban,228.69,36.6,formerly smoked,1\n51676,Female,61,0,0,Yes,Self-employed,Rural,202.21,N/A,never smoked,1\n31112,Male,80,0,1,Yes,Private,Rural,105.92,32.5,never smoked,1\n60182,Female,49,0,0,Yes,Private,Urban,171.23,34.4,smokes,1\n1665,Female,79,1,0,Yes,Self-employed,Rural,174.12,24,never smoked,1\n56669,Male,81,0,0,Yes,Private,Urban,186.21,29,formerly smoked,1\n53882,Male,74,1,1,Yes,Private,Rural,70.09,27.4,never smoked,1\n10434,Female,69,0,0,No,Private,Urban,94.39,22.8,never smoked,1\n27419,Female,59,0,0,Yes,Private,Rural,76.15,N/A,Unknown,1\n60491,Female,78,0,0,Yes,Private,Urban,58.57,24.2,Unknown,1\n12109,Female,81,1,0,Yes,Private,Rural,80.43,29.7,never smoked,1\n12095,Female,61,0,1,Yes,Govt_job,Rural,120.46,36.8,smokes,1\n12175,Female,54,0,0,Yes,Private,Urban,104.51,27.3,smokes,1\n8213,Mal

In [None]:
data = spark.read.option('header', 'true').csv('/content/healthcare-dataset-stroke-data.csv')

In [None]:
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- hypertension: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: string (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: string (nullable = true)



In [None]:
data.show(3)

+-----+------+---+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender|age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+---+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male| 67|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female| 61|           0|            0|         Yes|Self-employed|         Rural|           202.21| N/A|   never smoked|     1|
|31112|  Male| 80|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
+-----+------+---+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
only showing top 3 rows



In [None]:
from pyspark.sql.functions import concat_ws, collect_set
data.select("gender", "ever_married").groupBy("gender").agg(concat_ws('-', collect_set("ever_married")).alias('ever_married')).show()

+------+------------+
|gender|ever_married|
+------+------------+
|Female|      No-Yes|
| Other|          No|
|  Male|      No-Yes|
+------+------------+



In [None]:
data.groupBy('gender', 'stroke').count().orderBy('gender', 'stroke').show()

+------+------+-----+
|gender|stroke|count|
+------+------+-----+
|Female|     0| 2853|
|Female|     1|  141|
|  Male|     0| 2007|
|  Male|     1|  108|
| Other|     0|    1|
+------+------+-----+



In [None]:
from pyspark.sql.functions import sum, count, col

In [None]:
data.groupBy('gender', 'stroke') \
    .count() \
    .withColumnRenamed('count', 'count_per_gender_and_stroke') \
    .withColumn('percentage_per_gender_and_stroke', (col('count_per_gender_and_stroke') / data.count()) * 100) \
    .show()

+------+------+---------------------------+--------------------------------+
|gender|stroke|count_per_gender_and_stroke|percentage_per_gender_and_stroke|
+------+------+---------------------------+--------------------------------+
|  Male|     1|                        108|              2.1135029354207435|
|Female|     0|                       2853|              55.831702544031316|
| Other|     0|                          1|            0.019569471624266144|
|  Male|     0|                       2007|               39.27592954990215|
|Female|     1|                        141|               2.759295499021526|
+------+------+---------------------------+--------------------------------+



In [None]:
data.describe().show()

+-------+-----------------+------+------------------+------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|summary|               id|gender|               age|      hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi|smoking_status|             stroke|
+-------+-----------------+------+------------------+------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|  count|             5110|  5110|              5110|              5110|               5110|        5110|     5110|          5110|              5110|              5110|          5110|               5110|
|   mean|36517.82935420744|  null|43.226614481409015|0.0974559686888454|0.05401174168297456|        null|     null|          null|106.14767710371804|28.893236911794673|          null| 

In [None]:
data.dtypes

[('id', 'string'),
 ('gender', 'string'),
 ('age', 'string'),
 ('hypertension', 'string'),
 ('heart_disease', 'string'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'string'),
 ('bmi', 'string'),
 ('smoking_status', 'string'),
 ('stroke', 'string')]

In [None]:
data.distinct().show() #displays a dataframe containing the distinct values of each column

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|18937|  Male|  79|           0|            0|         Yes|      Private|         Rural|           114.77| N/A|formerly smoked|     1|
|42996|Female|  36|           0|            0|          No|     Govt_job|         Rural|           126.82|23.3|   never smoked|     0|
|59906|Female|  40|           0|            0|         Yes|      Private|         Rural|            139.9|31.7|         smokes|     0|
|29010|  Male|   5|           0|            0|          No|     children|         Rural|           100.52|17.2|        Unknown|     0|
| 3590|Female|  28|           1|            0|         

In [None]:
data.select('gender').distinct().show() #getting distinct values of gender column

+------+
|gender|
+------+
|Female|
| Other|
|  Male|
+------+



In [None]:
data.select('gender', 'hypertension', 'heart_disease', 'ever_married', 'work_type', 'Residence_type', 'smoking_status', 'stroke') \
    .distinct().show() #displaying distinct values of multiple columns in the form of dataframe

+------+------------+-------------+------------+-------------+--------------+---------------+------+
|gender|hypertension|heart_disease|ever_married|    work_type|Residence_type| smoking_status|stroke|
+------+------------+-------------+------------+-------------+--------------+---------------+------+
|Female|           0|            1|         Yes|Self-employed|         Rural|   never smoked|     0|
|  Male|           1|            0|          No|      Private|         Urban|   never smoked|     1|
|  Male|           0|            0|         Yes|      Private|         Rural|   never smoked|     1|
|  Male|           1|            0|         Yes|     Govt_job|         Urban|formerly smoked|     1|
|  Male|           1|            0|         Yes|Self-employed|         Rural|formerly smoked|     1|
|Female|           0|            0|         Yes|      Private|         Urban|        Unknown|     1|
|Female|           1|            0|         Yes|     Govt_job|         Urban|   never smoke

#Experiment betweem 2 dataframes

In [None]:
from google.colab import files
files.upload()

Saving links.csv to links.csv
Saving movies.csv to movies.csv
Saving ratings.csv to ratings.csv
Saving README.txt to README.txt
Saving tags.csv to tags.csv


 'links.csv': b'movieId,imdbId,tmdbId\r\n1,0114709,862\r\n2,0113497,8844\r\n3,0113228,15602\r\n4,0114885,31357\r\n5,0113041,11862\r\n6,0113277,949\r\n7,0114319,11860\r\n8,0112302,45325\r\n9,0114576,9091\r\n10,0113189,710\r\n11,0112346,9087\r\n12,0112896,12110\r\n13,0112453,21032\r\n14,0113987,10858\r\n15,0112760,1408\r\n16,0112641,524\r\n17,0114388,4584\r\n18,0113101,5\r\n19,0112281,9273\r\n20,0113845,11517\r\n21,0113161,8012\r\n22,0112722,1710\r\n23,0112401,9691\r\n24,0114168,12665\r\n25,0113627,451\r\n26,0114057,16420\r\n27,0114011,9263\r\n28,0114117,17015\r\n29,0112682,902\r\n30,0115012,37557\r\n31,0112792,9909\r\n32,0114746,63\r\n34,0112431,9598\r\n36,0112818,687\r\n38,0113442,33689\r\n39,0112697,9603\r\n40,0112749,34615\r\n41,0114279,31174\r\n42,0112819,11443\r\n43,0114272,35196\r\n44,0113855,9312\r\n45,0114681,577\r\n46,0113347,11861\r\n47,0114369,807\r\n48,0114148,10530\r\n49,0114916,8391\r\n50,0114814,629\r\n52,0113819,11448\r\n53,0110299,49133\r\n54,0112499,26441\r\n55,0113158

In [None]:
links_data = spark.read.option('header', 'true').csv('/content/links.csv')
movies_data = spark.read.option('header', 'true').csv('/content/movies.csv')
ratings_data = spark.read.option('header', 'true').csv('/content/ratings.csv')
tags_data = spark.read.option('header', 'true').csv('/content/tags.csv')

In [None]:
links_data.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: string (nullable = true)



In [None]:
movies_data.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [None]:
ratings_data.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
tags_data.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [None]:
movies_data.count()

9742

In [None]:
ratings_data.count()

100836

##Using left semi join, left anti join and union
Adding a column 'rated' with value 'Yes' for all the movieId from movies_data which are also present in rating_data and 'No' for all the movieId from movies_data which are not present in rating_data

In [None]:
#Adding a column 'rated' with value 'Yes' for all the movieId from movies_data which are also present in rating_data
from pyspark.sql.functions import when, lit
rated_movies_data = movies_data.join(ratings_data, movies_data.movieId == ratings_data.movieId, "leftsemi").withColumn('rated', lit('Yes'))
rated_movies_data.show()

+-------+--------------------+--------------------+-----+
|movieId|               title|              genres|rated|
+-------+--------------------+--------------------+-----+
|      1|    Toy Story (1995)|Adventure|Animati...|  Yes|
|      2|      Jumanji (1995)|Adventure|Childre...|  Yes|
|      3|Grumpier Old Men ...|      Comedy|Romance|  Yes|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|  Yes|
|      5|Father of the Bri...|              Comedy|  Yes|
|      6|         Heat (1995)|Action|Crime|Thri...|  Yes|
|      7|      Sabrina (1995)|      Comedy|Romance|  Yes|
|      8| Tom and Huck (1995)|  Adventure|Children|  Yes|
|      9| Sudden Death (1995)|              Action|  Yes|
|     10|    GoldenEye (1995)|Action|Adventure|...|  Yes|
|     11|American Presiden...|Comedy|Drama|Romance|  Yes|
|     12|Dracula: Dead and...|       Comedy|Horror|  Yes|
|     13|        Balto (1995)|Adventure|Animati...|  Yes|
|     14|        Nixon (1995)|               Drama|  Yes|
|     15|Cutth

In [None]:
#Adding a column 'rated' with value 'No' for all the movieId from movies_data which are not present in rating_data
unrated_movies_data = movies_data.join(ratings_data, movies_data.movieId == ratings_data.movieId, "leftanti").withColumn("rated", lit("No"))
unrated_movies_data.show()

+-------+--------------------+--------------------+-----+
|movieId|               title|              genres|rated|
+-------+--------------------+--------------------+-----+
|   1076|Innocents, The (1...|Drama|Horror|Thri...|   No|
|   2939|      Niagara (1953)|      Drama|Thriller|   No|
|   3338|For All Mankind (...|         Documentary|   No|
|   3456|Color of Paradise...|               Drama|   No|
|   4194|I Know Where I'm ...|   Drama|Romance|War|   No|
|   5721|  Chosen, The (1981)|               Drama|   No|
|   6668|Road Home, The (W...|       Drama|Romance|   No|
|   6849|      Scrooge (1970)|Drama|Fantasy|Mus...|   No|
|   7020|        Proof (1991)|Comedy|Drama|Romance|   No|
|   7792|Parallax View, Th...|            Thriller|   No|
|   8765|This Gun for Hire...|Crime|Film-Noir|T...|   No|
|  25855|Roaring Twenties,...|Crime|Drama|Thriller|   No|
|  26085|Mutiny on the Bou...|Adventure|Drama|R...|   No|
|  30892|In the Realms of ...|Animation|Documen...|   No|
|  32160|Twent

In [None]:
rated_movies_data.count()

9724

In [None]:
unrated_movies_data.count()

18

In [None]:
movies_data_with_rating = rated_movies_data.union(unrated_movies_data)
movies_data_with_rating.count()

9742

In [None]:
movies_data_with_rating.orderBy("rated").show()

+-------+--------------------+--------------------+-----+
|movieId|               title|              genres|rated|
+-------+--------------------+--------------------+-----+
|  25855|Roaring Twenties,...|Crime|Drama|Thriller|   No|
|   6668|Road Home, The (W...|       Drama|Romance|   No|
|   8765|This Gun for Hire...|Crime|Film-Noir|T...|   No|
|  32160|Twentieth Century...|              Comedy|   No|
|   3338|For All Mankind (...|         Documentary|   No|
|   5721|  Chosen, The (1981)|               Drama|   No|
|   7020|        Proof (1991)|Comedy|Drama|Romance|   No|
|   7792|Parallax View, Th...|            Thriller|   No|
|  26085|Mutiny on the Bou...|Adventure|Drama|R...|   No|
|  30892|In the Realms of ...|Animation|Documen...|   No|
|  32371|Call Northside 77...|Crime|Drama|Film-...|   No|
|  34482|Browning Version,...|               Drama|   No|
|   1076|Innocents, The (1...|Drama|Horror|Thri...|   No|
|   2939|      Niagara (1953)|      Drama|Thriller|   No|
|   3456|Color

##Using left join
Adding a column 'rated' with value 'Yes' for all the movieId from movies_data which are also present in rating_data and 'No' for all the movieId from movies_data which are not present in rating_data

In [None]:
from pyspark.sql.functions import col, when
updated_movies_data = movies_data.alias("movies").join(ratings_data.alias("ratings"), \
                                                   col("movies.movieId") == col("ratings.movieId"),\
                                                   "left")\
                            .select(col("movies.*"), col("ratings.rating"))\
                            .withColumn("rated", when(col("rating").isNull(), "No")\
                                                .otherwise("Yes"))
updated_movies_data.orderBy("rating").show()

+-------+--------------------+--------------------+------+-----+
|movieId|               title|              genres|rating|rated|
+-------+--------------------+--------------------+------+-----+
|   3338|For All Mankind (...|         Documentary|  null|   No|
|   2939|      Niagara (1953)|      Drama|Thriller|  null|   No|
|   5721|  Chosen, The (1981)|               Drama|  null|   No|
|   6668|Road Home, The (W...|       Drama|Romance|  null|   No|
|   6849|      Scrooge (1970)|Drama|Fantasy|Mus...|  null|   No|
|   3456|Color of Paradise...|               Drama|  null|   No|
|   1076|Innocents, The (1...|Drama|Horror|Thri...|  null|   No|
|   8765|This Gun for Hire...|Crime|Film-Noir|T...|  null|   No|
|  25855|Roaring Twenties,...|Crime|Drama|Thriller|  null|   No|
|  26085|Mutiny on the Bou...|Adventure|Drama|R...|  null|   No|
|  30892|In the Realms of ...|Animation|Documen...|  null|   No|
|  32160|Twentieth Century...|              Comedy|  null|   No|
|  32371|Call Northside 7

In [None]:
updated_movies_data.groupBy("rated").count().show()

+-----+------+
|rated| count|
+-----+------+
|   No|    18|
|  Yes|100836|
+-----+------+



In [None]:
updated_movies_data.filter(updated_movies_data.rating.isNull()).show()

+-------+--------------------+--------------------+------+-----+
|movieId|               title|              genres|rating|rated|
+-------+--------------------+--------------------+------+-----+
|   1076|Innocents, The (1...|Drama|Horror|Thri...|  null|   No|
|   2939|      Niagara (1953)|      Drama|Thriller|  null|   No|
|   3338|For All Mankind (...|         Documentary|  null|   No|
|   3456|Color of Paradise...|               Drama|  null|   No|
|   4194|I Know Where I'm ...|   Drama|Romance|War|  null|   No|
|   5721|  Chosen, The (1981)|               Drama|  null|   No|
|   6668|Road Home, The (W...|       Drama|Romance|  null|   No|
|   6849|      Scrooge (1970)|Drama|Fantasy|Mus...|  null|   No|
|   7020|        Proof (1991)|Comedy|Drama|Romance|  null|   No|
|   7792|Parallax View, Th...|            Thriller|  null|   No|
|   8765|This Gun for Hire...|Crime|Film-Noir|T...|  null|   No|
|  25855|Roaring Twenties,...|Crime|Drama|Thriller|  null|   No|
|  26085|Mutiny on the Bo

##Getting the list of all columns having only NULL or NOT NULL values in a DataFrame

In [None]:
from pyspark.sql.functions import count, when
df_count = rated_movies_data.count()
null_check_df = rated_movies_data.select([(count(when(rated_movies_data[column].isNull(), column)) == df_count).alias(column)\
                                      for column in rated_movies_data.columns])

In [None]:
null_check_df.show()

+-------+-----+------+-----+
|movieId|title|genres|rated|
+-------+-----+------+-----+
|  false|false| false|false|
+-------+-----+------+-----+



In [None]:
null_check_df.collect()

[Row(movieId=False, title=False, genres=False, rated=False)]

In [None]:
null_check_df.collect()[0].asDict()

{'genres': False, 'movieId': False, 'rated': False, 'title': False}

In [None]:
not_null_column_lst = [key for key, val in null_check_df.collect()[0].asDict().items() if val == False]
print(not_null_column_lst)

['movieId', 'title', 'genres', 'rated']


#SelectExpr
SelectExpr is equivalent to select method. SelectExpr takes sql expressions as arguments.

In [None]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

-------------------------------------------------------------------------
-------------------------------------------------------------------------
#RDD

In [None]:
rdd_text = spark.sparkContext.textFile('/content/healthcare-dataset-stroke-data.csv')

In [None]:
rdd_text.getNumPartitions()

1

In [None]:
rdd_text_part = spark.sparkContext.textFile('/content/healthcare-dataset-stroke-data.csv', minPartitions = 4)

In [None]:
rdd_text_part.getNumPartitions()

4

In [None]:
rdd_grouped = rdd_text.map(lambda line: line.split(',')).groupBy(lambda x: x[1])

In [None]:
print([(key, list(val)) for (key, val) in rdd_grouped.take(4)])

In [None]:
for key, val in rdd_grouped.collect():
  print((key, list(val)))

In [None]:
rdd_groupedByKey = rdd_text.map(lambda line: line.split(',')).map(lambda x: (x[1],1)).groupByKey()

In [None]:
print(list(x[0], list(x[1])) for x in rdd_groupedByKey.collect())

In [None]:
for x in rdd_groupedByKey.collect():
  print((x[0], list(x[1])))

In [None]:
rdd_groupedByKey.mapValues(sum).sortByKey().collect()

In [None]:
rdd_text.map(lambda line: line.split(',')).map(lambda x: ((x[1], x[-1]), 1)).groupByKey().mapValues(sum).collect()

[(('gender', 'stroke'), 1),
 (('Male', '1'), 108),
 (('Female', '1'), 141),
 (('Male', '0'), 2007),
 (('Female', '0'), 2853),
 (('Other', '0'), 1)]

In [None]:
rdd_text_part.map(lambda line: line.split(',')).map(lambda x: ((x[1], x[-1]), 1)).groupByKey().mapValues(sum).collect()

[(('Male', '1'), 108),
 (('Female', '1'), 141),
 (('Male', '0'), 2007),
 (('Female', '0'), 2853),
 (('gender', 'stroke'), 1),
 (('Other', '0'), 1)]