In [1]:
!pip install mkl-service



In [1]:
import pyspark

In [3]:
import os



from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
spark = SparkSession.builder.master("local[*]") \
    .appName("prajwal") \
    .config("spark.driver.extraClassPath", "C:\\my_sql_jar\\mysql-connector-java-8.0.26.jar") \
    .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001ED61122C70>


In [5]:
flight_df = spark.read.format("csv")\
            .option("header", "false")\
            .option("inferschema", "false")\
            .option("mode", "FAILFAST")\
            .load("flight_data.csv")
flight_df.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [6]:
flight_df_header = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferschema", "false")\
            .option("mode", "FAILFAST")\
            .load("flight_data.csv")
flight_df_header.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [7]:
flight_df_header.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [8]:
flight_df_header_schema = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferschema", "true")\
            .option("mode", "FAILFAST")\
            .load("flight_data.csv")
flight_df_header_schema.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [9]:
flight_df_header_schema.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [10]:
my_schema = StructType([
                        StructField("DEST_COUNTRY_NAME", StringType(), True),
                        StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
                        StructField("count", IntegerType(), True)
])

In [11]:
flight_df_1 = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferschema", "false")\
            .schema(my_schema)\
            .option("mode", "PERMISSIVE")\
            .load("flight_data.csv")
flight_df_1.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [12]:
%ls C:\Users\HP\Pyspark

 Volume in drive C has no label.
 Volume Serial Number is 6951-7C20

 Directory of C:\Users\HP\Pyspark

12/02/2023  08:56 AM    <DIR>          .
12/02/2023  08:56 AM    <DIR>          ..
12/01/2023  08:27 PM    <DIR>          .ipynb_checkpoints
12/01/2023  08:59 PM               220 corrupted_json.json
11/26/2023  06:42 PM               230 employee.csv
11/25/2023  11:21 PM             7,121 flight_data.csv
12/01/2023  08:37 PM               223 line_delimited_json.json
12/01/2023  08:47 PM               232 line_with_extra fields.json
12/01/2023  08:57 PM               310 Multi_line_correct.json
12/01/2023  08:57 PM               304 Multi_line_incorrect.json
10/15/2023  09:15 PM             5,561 Pyspark - Krish Naik's Tutorial.ipynb
12/02/2023  08:56 AM             9,164 Pyspark notes.ipynb
10/15/2023  10:57 AM               143 test1.csv
10/15/2023  10:58 AM               180 test2.csv
10/15/2023  10:58 AM               259 test3.csv
10/15/2023  10:58 AM             8,188 tips.csv

In [13]:
employee_df = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferschema", "false")\
            .option("mode", "PERMISSIVE")\
            .load("employee.csv")
employee_df.show(5)

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [14]:
employee_schema = StructType([
                        StructField("id", IntegerType(), True),
                        StructField("name", StringType(), True),
                        StructField("age", IntegerType(), True),
                        StructField("salary", IntegerType(), True),
                        StructField("address", StringType(), True),
                        StructField("nominee", StringType(), True),
                        StructField("corrupted_records", StringType(), True)])

In [15]:
employee_df = spark.read.format("csv")\
            .option("header", "true")\
            .option("mode", "PERMISSIVE")\
            .schema(employee_schema)\
            .load("employee.csv")
employee_df.show(truncate = False)

+---+--------+---+------+------------+--------+-----------------+
|id |name    |age|salary|address     |nominee |corrupted_records|
+---+--------+---+------+------------+--------+-----------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null             |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null             |
|3  |Pritam  |22 |150000|Bangalore   |India   |nominee3         |
|4  |Prantosh|17 |200000|Kolkata     |India   |nominee4         |
|5  |Vikash  |31 |300000|null        |nominee5|null             |
+---+--------+---+------+------------+--------+-----------------+



In [16]:
bad_data = employee_df.filter(col("corrupted_records").isNotNull())

In [17]:
bad_data.show()

+---+--------+---+------+---------+-------+-----------------+
| id|    name|age|salary|  address|nominee|corrupted_records|
+---+--------+---+------+---------+-------+-----------------+
|  3|  Pritam| 22|150000|Bangalore|  India|         nominee3|
|  4|Prantosh| 17|200000|  Kolkata|  India|         nominee4|
+---+--------+---+------+---------+-------+-----------------+



In [18]:
cleaned_data = employee_df.filter(col("corrupted_records").isNull())

In [19]:
cleaned_data.show()

+---+------+---+------+------------+--------+-----------------+
| id|  name|age|salary|     address| nominee|corrupted_records|
+---+------+---+------+------------+--------+-----------------+
|  1|Manish| 26| 75000|       bihar|nominee1|             null|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|             null|
|  5|Vikash| 31|300000|        null|nominee5|             null|
+---+------+---+------+------------+--------+-----------------+



## JSON file


In [20]:
spark.read.format("json")\
            .option("inferSchema", "true")\
            .option("mode", "PERMISSIVE")\
            .load("line_delimited_json.json").show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



In [21]:
spark.read.format("json")\
            .option("inferSchema", "true")\
            .option("mode", "PERMISSIVE")\
            .load("line_with_extra fields.json").show()

+---+------+--------+------+
|age|gender|    name|salary|
+---+------+--------+------+
| 20|  null|  Manish| 20000|
| 25|  null|  Nikita| 21000|
| 16|  null|  Pritam| 22000|
| 35|  null|Prantosh| 25000|
| 67|     M|  Vikash| 40000|
+---+------+--------+------+



In [22]:
spark.read.format("json")\
            .option("inferSchema", "true")\
            .option("mode", "PERMISSIVE")\
            .option("multiline", "true")\
            .load("Multi_line_correct.json").show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



In [23]:
spark.read.format("json")\
            .option("inferSchema", "true")\
            .option("mode", "PERMISSIVE")\
            .option("multiline", "true")\
            .load("Multi_line_incorrect.json").show()

+---+------+------+
|age|  name|salary|
+---+------+------+
| 20|Manish| 20000|
+---+------+------+



In [24]:
spark.read.format("json")\
            .option("inferSchema", "true")\
            .option("mode", "PERMISSIVE")\
            .load("corrupted_json.json").show(truncate = False)

+----------------------------------------+----+--------+------+
|_corrupt_record                         |age |name    |salary|
+----------------------------------------+----+--------+------+
|null                                    |20  |Manish  |20000 |
|null                                    |25  |Nikita  |21000 |
|null                                    |16  |Pritam  |22000 |
|null                                    |35  |Prantosh|25000 |
|{"name":"Vikash","age":67,"salary":40000|null|null    |null  |
+----------------------------------------+----+--------+------+

