# **PySpark Ingestion + Egress + Dataloading Techniques**

In [4]:
from pyspark.sql import SparkSession

#MySql jdbc connector jar local path
mysql_connector_jar_path = "/home/hduser/install/mysql-connector-java.jar"

#Spark Session Creation
spark =  SparkSession.builder\
    .appName("Spark-Ingress-Egress-Dataloading-Practice")\
    .config("spark.jars", mysql_connector_jar_path) \
    .getOrCreate()

print(f"[INFO] SparkSession Object Memory Reference: {spark}")

[INFO] SparkSession Object Memory Reference: <pyspark.sql.session.SparkSession object at 0xffff4557f3d0>


## **1. Converting Unstructured/Semi Structured Data to Structured Data using RDD then to DF**

In [44]:
# Case1: Users entered course data in Unstructured format. We would like to know the distinct number of course, which course mostly wanted?

#Sample data => samplecourse.log
"""
Docker Java JavaScript React Spark Kafka SQL Git Go Python AI

AWS Azure TensorFlow PyTorch Android iOS Rust AI
Python Cloud AI Docker Kubernetes Rust
"""

#1. Covert the unstructured to structured data using RDD
sc = spark.sparkContext
unstruct_rdd1 = sc.textFile("file:///home/hduser/samplecourse.log")
struct_rdd2 = unstruct_rdd1.flatMap(lambda row:row.split(" ")) #['Python', 'Cloud', 'AI', 'Docker', 'Kubernetes']
struct_schema_rdd3 = struct_rdd2.map(lambda word:[word]) #[['Python'], ['Cloud'], ['AI'], ['Docker'], ['Kubernetes']]

#2. Covert the RDD into DF for further analysis
df1 = struct_schema_rdd3.toDF()
df1.createOrReplaceTempView("course_view")
df_view = spark.sql("select distinct _1 as distinct_courses from course_view")
df_view.show(5)

df_view = spark.sql("select _1 as cource, count(*) as no_of_people_interested from course_view group by _1")
df_view.show(5)




+----------------+
|distinct_courses|
+----------------+
|            Rust|
|          Docker|
|      JavaScript|
|         PyTorch|
|             AWS|
+----------------+
only showing top 5 rows

+----------+-----------------------+
|    cource|no_of_people_interested|
+----------+-----------------------+
|      Rust|                 104000|
|    Docker|                 104000|
|JavaScript|                  52000|
|   PyTorch|                  52000|
|       AWS|                  52000|
+----------+-----------------------+
only showing top 5 rows



                                                                                

In [45]:
# Assuming 'sc' is your SparkContext

# Your syslog data
data = """
Aug 6 10:05:01 my-server-name systemd[1]: Started User Manager for UID 1000.
Aug 6 10:05:02 my-server-name kernel: [ 10.123456] Bluetooth: BNEP (Ethernet Emulation) ver 1.3
Aug 6 10:05:03 my-server-name sshd[12345]: Accepted password for user1 from 192.168.1.5 port 55678 ssh2
Aug 6 10:05:04 my-server-name CRON[23456]: (root) CMD (command -v debian-sa1 > /dev/null && debian-sa1 1 1)
Aug 6 10:05:05 my-server-name postfix/smtpd[34567]: connect from mail-server.example.com[203.0.113.10]
Aug 6 10:05:06 my-server-name anacron[45678]: Job `cron.daily' terminated
Aug 6 10:05:07 my-server-name systemd[1]: user@1000.service: Succeeded.
Aug 6 10:05:08 my-server-name kernel: [ 20.987654] usb 1-1.2: new high-speed USB device number 3 using xhci_hcd
"""

# Write the sample data to a file
with open("/home/hduser/syslog", "w") as f:
    f.write(data)

# Read the file into an RDD
raw_log_rdd = sc.textFile("/home/hduser/syslog")

import re
from pyspark.sql import Row

def parse_log_line_rdd(line):
    """
    Parses a single log line using a regular expression.
    Returns a Row object with parsed fields.
    """
    regex = r"(\w{3}\s+\d+\s+\d{2}:\d{2}:\d{2})\s+([\w.-]+)\s+([\w/]+)(?:\[(\d+)\])?:\s+(.*)"
    match = re.match(regex, line)
    
    if match:
        # Extract fields from regex groups
        timestamp_str, hostname, process_name, pid_str, message = match.groups()
        pid = int(pid_str) if pid_str else None
        
        return Row(
            timestamp=timestamp_str,
            hostname=hostname,
            process=process_name,
            pid=pid,
            message=message.strip()
        )
    else:
        # For unparseable lines, return a Row with an error message
        return Row(timestamp=None, hostname=None, process=None, pid=None, message=line, parse_error=True)

# Apply the parsing function to the RDD
structured_log_rdd = raw_log_rdd.map(parse_log_line_rdd)

# Print some results to verify
print("Example structured RDD records:")
for row in structured_log_rdd.take(3):
    print(row)


Example structured RDD records:


Py4JJavaError: An error occurred while calling o1063.partitions.
: java.net.ConnectException: Call From localhost.localdomain/127.0.0.1 to localhost:54310 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:913)
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:828)
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1616)
	at org.apache.hadoop.ipc.Client.call(Client.java:1558)
	at org.apache.hadoop.ipc.Client.call(Client.java:1455)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
	at com.sun.proxy.$Proxy41.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:965)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at com.sun.proxy.$Proxy42.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1739)
	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1753)
	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1750)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1765)
	at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:115)
	at org.apache.hadoop.fs.Globber.doGlob(Globber.java:349)
	at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
	at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2142)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:276)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:210)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at jdk.internal.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:205)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:586)
	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:711)
	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:833)
	at org.apache.hadoop.ipc.Client$Connection.access$3800(Client.java:414)
	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1677)
	at org.apache.hadoop.ipc.Client.call(Client.java:1502)
	... 49 more


## **2. Reading a CSV data and write into MySql(RDBMS) Database using JDBC Option**

In [5]:
###### Reading CSV data and write into DataFrame #######

# Sample Customer Info Data
"""
cd /home/hduser/custinfo.csv

4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,77,Teacher
4000003,Sherri,Melton,34,Firefighter
4000004,Gretchen,Hill,66,Computer hardware engineer
4000005,Karen,Puckett,74,Lawyer
"""
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Schema Definition
custinfo_schema = StructType([StructField('custid', IntegerType(), True), StructField('first_name', StringType(), True), StructField('last_name', StringType(), True), StructField('age', IntegerType(), True), StructField('profession', StringType(), True)])

# CSV Data Read and storing it in DataFrame
df1 = spark.read.csv(path="file:///home/hduser/custinfo.csv",header=False,sep=",",inferSchema=False,schema=custinfo_schema)
df1.show(truncate=False,n=5)
print(f"[INFO] df1.count() = {df1.count()}")


+-------+----------+---------+---+--------------------------+
|custid |first_name|last_name|age|profession                |
+-------+----------+---------+---+--------------------------+
|4000001|Kristina  |Chung    |55 |Pilot                     |
|4000002|Paige     |Chen     |77 |Teacher                   |
|4000003|Sherri    |Melton   |34 |Firefighter               |
|4000004|Gretchen  |Hill     |66 |Computer hardware engineer|
|4000005|Karen     |Puckett  |74 |Lawyer                    |
+-------+----------+---------+---+--------------------------+
only showing top 5 rows

[INFO] df1.count() = 9999


In [6]:
###### Write the data into MySql DB ######

# JDBC Options
url1='jdbc:mysql://127.0.0.1:3306/stocksdb?createDatabaseIfNotExist=true'
dbproperties={'user':'root','password':'Root123$','driver':'com.mysql.cj.jdbc.Driver'}

# Write into DB
df1.write.jdbc(url=url1,properties=dbproperties,table="custinfo",mode="overwrite")
print("[INFO] CSV file data write into MySQL DB is successful.")


[INFO] CSV file data write into MySQL DB is successful.


                                                                                

In [7]:
###### Simple way to read the data from MySql/RDBMS DB using JDBC ######

# JDBC Options
url1='jdbc:mysql://127.0.0.1:3306/stocksdb'
dbproperties={'user':'root','password':'Root123$','driver':'com.mysql.cj.jdbc.Driver'}

# Read the data from RDBMS using query instead of direct table
table_query = "(select * from stocksdb.custinfo) as tablename"
df2_db = spark.read.jdbc(url=url1,properties=dbproperties,table=table_query)
df2_db.cache()
df2_db.show(truncate=False,n=5)

+-------+----------+---------+---+--------------------------+
|custid |first_name|last_name|age|profession                |
+-------+----------+---------+---+--------------------------+
|4000001|Kristina  |Chung    |55 |Pilot                     |
|4000002|Paige     |Chen     |77 |Teacher                   |
|4000003|Sherri    |Melton   |34 |Firefighter               |
|4000004|Gretchen  |Hill     |66 |Computer hardware engineer|
|4000005|Karen     |Puckett  |74 |Lawyer                    |
+-------+----------+---------+---+--------------------------+
only showing top 5 rows



In [8]:
###### Optimized way to read the data from any RDBMS DB using JDBC ######

#Question: How to improve performance for JDBC?
#partition, fetchsize, caching, pushdown optimization etc.,
#partitionColumn:, numberOfPartitions:, upperBound:, lowerBound, predicates, fetchsize..

# JDBC Options for performance optimization
url1='jdbc:mysql://127.0.0.1:3306/stocksdb'
dbproperties = {
    'user': 'root',
    'password': 'Root123$',
    'driver': 'com.mysql.cj.jdbc.Driver',
    # Performance optimization options (values as strings):
    'partitionColumn': 'custid',
    'lowerBound': '4000001',  # Column used to divide data into sections for parallel processing.
    'upperBound': '4000100',  # Minimum value for the partition column to start reading data.
    'numPartitions': '3',     # Maximum value for the partition column to start reading data.
    'pushDownPredicate': 'true',  # Sends filters (WHERE clauses) to the database for early processing.
    'pushDownAggregate': 'true',  # Sends aggregations (SUM, COUNT) to the database for early processing.
    'queryTimeout': '120',    # Maximum time (in seconds) a database query can run before timing out.
    'fetchSize': '10',        # Number of rows retrieved from the database in each batch.
    'isolationLevel': 'READ_COMMITTED' # Ensures only committed data is visible during a transaction.
}

# Read the data from RDBMS using query instead of direct table
table_query = "(select * from stocksdb.custinfo) as tablename"
df2_db = spark.read.jdbc(url=url1,properties=dbproperties,table=table_query)
df2_db.show(truncate=False,n=5)

+-------+----------+---------+---+--------------------------+
|custid |first_name|last_name|age|profession                |
+-------+----------+---------+---+--------------------------+
|4000001|Kristina  |Chung    |55 |Pilot                     |
|4000002|Paige     |Chen     |77 |Teacher                   |
|4000003|Sherri    |Melton   |34 |Firefighter               |
|4000004|Gretchen  |Hill     |66 |Computer hardware engineer|
|4000005|Karen     |Puckett  |74 |Lawyer                    |
+-------+----------+---------+---+--------------------------+
only showing top 5 rows



## **3. Schema Evoluation/Growing handling using columner file formats ORC/Parquet**

In [9]:
#ORC/PARQUET Other Properties

#Source is sending data on a daily basis, once in a while the schema of the data is evolving/growing
  #Example (Day1): exch~stock~price
  #Example (Day2): exch~stock~price~buyer
  #Example (Day3): stock~price~seller

#**mergeSchema: Orc/Parquet read all the datafiles headers and merge them into one header

In [10]:
# Sample data
day1 = """
exch~stock~price
NYSE~CLI~36.3
NYSE~ABC~36.3
"""

day2 = """
exch~stock~price~buyer
NYSE~CLI~37.3~Alan
NYSE~ABC~37.3~Harpar
"""

day3 = """
stock~price~seller
CLI~37.3~Jack
ABC~37.3~Ross
"""

"""
/home/hduser/stockdata_csv/
├── part-00000-01f262bb-27a7-465d-95ca-4fdb6e1986aa-c000.csv
└── _SUCCESS
"""

# Write the same data into CSV + Read the CSV + Write into ORC format (Append) + Read the ORC data (MergeSchema=True) 

# Day 1: exch~stock~price
lines_day1 = day1.strip().split('\n')
header_day1 = lines_day1[0].split('~')
data_rows_day1 = [line.split('~') for line in lines_day1[1:]]
df1 = spark.createDataFrame(data_rows_day1, header_day1)
df1.coalesce(1).write.csv(path="file:///home/hduser/stockdata_csv/",mode="overwrite",sep="~",header=True)

df_csv = spark.read.csv(path="file:///home/hduser/stockdata_csv/",pathGlobFilter="part-*.csv",sep="~",header=True)
print("[INFO] Day1 : Source CSV data")
df_csv.show()
df_csv.coalesce(1).write.orc(path="file:///home/hduser/stockdata_orc/",mode="overwrite") # Overwrite for the first time
df_orc = spark.read.orc(path="file:///home/hduser/stockdata_orc/",mergeSchema=True) # Schema Evoluation
print("[INFO] Day1 : ORC data read")
df_orc.show()                         

# Day 2: exch~stock~price~buyer
lines_day2 = day2.strip().split('\n')
header_day2 = lines_day2[0].split('~')
data_rows_day2 = [line.split('~') for line in lines_day2[1:]]
df2 = spark.createDataFrame(data_rows_day2, header_day2)
df2.coalesce(1).write.csv(path="file:///home/hduser/stockdata_csv/",mode="overwrite",sep="~",header=True)

df_csv = spark.read.csv(path="file:///home/hduser/stockdata_csv/",pathGlobFilter="part-*.csv",sep="~",header=True)
print("[INFO] Day2 : Source CSV data")
df_csv.show()
df_csv.coalesce(1).write.orc(path="file:///home/hduser/stockdata_orc/",mode="append") # Append for the Schema Evoluation
df_orc = spark.read.orc(path="file:///home/hduser/stockdata_orc/",mergeSchema=True) # Schema Evoluation
print("[INFO] Day2 : ORC data read with evolved schema")
df_orc.show()    

# Day 3: stock~price~seller
lines_day3 = day3.strip().split('\n')
header_day3 = lines_day3[0].split('~')
data_rows_day3 = [line.split('~') for line in lines_day3[1:]]
df3 = spark.createDataFrame(data_rows_day3, header_day3)
print("[INFO] Day3 : Source CSV data")
df3.show()
df3.coalesce(1).write.csv(path="file:///home/hduser/stockdata_csv/",mode="overwrite",sep="~",header=True)

df_csv = spark.read.csv(path="file:///home/hduser/stockdata_csv/",pathGlobFilter="part-*.csv",sep="~",header=True)
df_csv.coalesce(1).write.orc(path="file:///home/hduser/stockdata_orc/",mode="append") # Append for the Schema Evoluation
df_orc = spark.read.orc(path="file:///home/hduser/stockdata_orc/",mergeSchema=True) # Schema Evoluation
print("[INFO] Day3 : ORC data read evolved schema")
df_orc.show()    


                                                                                

[INFO] Day1 : Source CSV data
+----+-----+-----+
|exch|stock|price|
+----+-----+-----+
|NYSE|  CLI| 36.3|
|NYSE|  ABC| 36.3|
+----+-----+-----+

[INFO] Day1 : ORC data read
+----+-----+-----+
|exch|stock|price|
+----+-----+-----+
|NYSE|  CLI| 36.3|
|NYSE|  ABC| 36.3|
+----+-----+-----+

[INFO] Day2 : Source CSV data
+----+-----+-----+------+
|exch|stock|price| buyer|
+----+-----+-----+------+
|NYSE|  CLI| 37.3|  Alan|
|NYSE|  ABC| 37.3|Harpar|
+----+-----+-----+------+

[INFO] Day2 : ORC data read with evolved schema
+----+-----+-----+------+
|exch|stock|price| buyer|
+----+-----+-----+------+
|NYSE|  CLI| 37.3|  Alan|
|NYSE|  ABC| 37.3|Harpar|
|NYSE|  CLI| 36.3|  NULL|
|NYSE|  ABC| 36.3|  NULL|
+----+-----+-----+------+

[INFO] Day3 : Source CSV data
+-----+-----+------+
|stock|price|seller|
+-----+-----+------+
|  CLI| 37.3|  Jack|
|  ABC| 37.3|  Ross|
+-----+-----+------+

[INFO] Day3 : ORC data read evolved schema
+----+-----+-----+------+------+
|exch|stock|price| buyer|seller|
+-

## **4. Reading a JSON data with various options**

In [11]:
from pyspark.sql.types import DecimalType,BooleanType,ArrayType,DateType,TimestampType

# Data
samplejson = """
[
  {
    "id": 1,
    "name": "Alice",
    "age": 30,
    "salary": 50000.50,
    "isActive": true,
    "comments": "This is a comment.",
    "tags": ["A", "B"],
    "address": {
      "street": "123 Main St",
      "city": "Anytown"
    }
  },
  {
    "id": 2,
    "name": "Bob",
    "age": 25,
    "salary": 45000.75,
    "isActive": false,
    "comments": "Another comment.",
    "tags": ["C"],
    "address": {
      "street": "456 Oak Ave",
      "city": "Otherville"
    }
  },
  {
    "id": 3,
    "name": "Charlie",
    "age": null,
    "salary": null,
    "isActive": true,
    "comments": "Invalid JSON",
    "tags": ["D", "E"]
  },
  {
    "id": 4,
    "name": "David",
    "age": 40,
    "salary": 60000.00,
    "isActive": true,
    "date_joined": "2023-01-15",
    "timestamp_event": "2023-01-15 10:30:00.123"
  },
  {
    "id": 5,
    "name": "Eve",
    "age": 35,
    "salary": 55555.555,
    "isActive": true,
    "comments": "This has 'single quotes'.",
    "field with space": "value"
  },
  {
    "id": 6,
    "name": "Frank",
    "age": 28,
    "salary": 12345.678,
    "isActive": true,
    "comments": "Escaped chars: \\n\\t\\r",
    "tags": ["F"],
    "decimal_val": 12345.678
  },
  {
    "invalid":"invalid"
  }  
]
"""

# JSON file creation
import os
file_path = "/home/hduser/employe_json/sample.json"
directory = os.path.dirname(file_path)
os.makedirs(directory, exist_ok=True)
with open("/home/hduser/employe_json/sample.json", "w") as f:
    f.write(samplejson)

# Define a custom schema
custom_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", DecimalType(10, 3), True),
    StructField("isActive", BooleanType(), True),
    StructField("comments", StringType(), True),
    StructField("tags", ArrayType(StringType()), True),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True)
    ])),
    StructField("date_joined", DateType(), True),
    StructField("timestamp_event", TimestampType(), True),
    StructField("corrupted_record", StringType(), True),
])

# Read JSON with multiple options and inline comments
df = spark.read.json(
    path="file:///home/hduser/employe_json/",  # Specifies the location of the JSON file(s) to read.
    schema=custom_schema,  # Defines a custom schema to avoid automatic inference.
    primitivesAsString=False,  # Treats all primitive values (int, float, bool) as strings if True.
    prefersDecimal=True,  # Infers floating-point numbers as DecimalType instead of DoubleType.
    allowComments=True,  # Allows Java/C++ style comments (//, /* */) in JSON.
    allowUnquotedFieldNames=True,  # Accepts JSON keys without double quotes.
    allowSingleQuotes=True,  # Accepts single quotes for string values.
    allowBackslashEscapingAnyCharacter=True,  # Allows any character to be escaped with a backslash.
    mode="PERMISSIVE",  # Defines how to handle corrupt records (PERMISSIVE, DROPMALFORMED, FAILFAST).
    columnNameOfCorruptRecord="corrupted_record",  # Stores malformed JSON strings in a specified column.
    dateFormat="yyyy-MM-dd",  # Specifies the format for parsing date strings.
    timestampFormat="yyyy-MM-dd HH:mm:ss.SSS",  # Specifies the format for parsing timestamp strings.
    multiLine=True,  # Treats the entire file as a single JSON object (for pretty-printed or array JSON).
    allowUnquotedControlChars=True,  # Allows control characters (e.g., \n, \t) to appear unquoted.
    lineSep="\n",  # Defines a custom line separator between JSON records. If multiline=True then it is not required.
    samplingRatio=1.0,  # Sets the fraction of data used for schema inference.
    encoding="UTF-8",  # Specifies the character encoding (e.g., UTF-8, UTF-16).
    locale="en-US",  # Sets the locale for parsing locale-sensitive data like dates.
    pathGlobFilter="*.json",  # Filters files using glob patterns (e.g., *.json).
    recursiveFileLookup=True  # Enables recursive search in subdirectories.
)

df.printSchema()
df.show(truncate=False)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: decimal(10,3) (nullable = true)
 |-- isActive: boolean (nullable = true)
 |-- comments: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- address: struct (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- city: string (nullable = true)
 |-- date_joined: date (nullable = true)
 |-- timestamp_event: timestamp (nullable = true)
 |-- corrupted_record: string (nullable = true)

+----+-------+----+---------+--------+-------------------------+------+-------------------------+-----------+-----------------------+----------------+
|id  |name   |age |salary   |isActive|comments                 |tags  |address                  |date_joined|timestamp_event        |corrupted_record|
+----+-------+----+---------+--------+-------------------------+------+-------------------------+-----------+-----

## **5. Reading a CSV data with various options**

In [12]:
from pyspark.sql.types import DoubleType

# Data
data = """
symbol,exchange,date,timestamp,price,volume
AAPL,NYSE,2023-08-01,2023-08-01 09:30:00,195.25,1200000
GOOGL,NYSE,2023-08-01,2023-08-01 09:30:00,2735.55,850000
MSFT,NYSE,2023-08-01,2023-08-01 09:30:00,-1,950000
TSLA,NYSE,2023-08-01,2023-08-01 09:30:00,Inf,1100000
AMZN,NYSE,na,2023-08-01 09:30:00,134.25,na
MSFK,NYSE,2023-08-01,2023-08-01 09:30,100.01,950000
INVALID_ROW_WITHOUT_PROPER_FIELDS       
"""

# CSV file creation
file_path = "/home/hduser/employe_csv/sample.csv"
directory = os.path.dirname(file_path)
os.makedirs(directory, exist_ok=True)
with open(file_path, "w") as f:
    f.write(data)

# Define custom schema
customschema = StructType([
    StructField("symbol", StringType(), True),
    StructField("exchange", StringType(), True),
    StructField("date", DateType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("volume", IntegerType(), True),
    StructField("corrupted_data", StringType(), True)
])

# Read CSV with various options
df1 = spark.read.csv(
    path="file:///home/hduser/employe_csv/sample.csv",
    sep=',',  # Column separator used in the CSV file
    header=True,  # First line of the file contains column headers
    schema=customschema,  # Custom schema to define data types and structure
    columnNameOfCorruptRecord='corrupted_data',  # Stores malformed rows in this column
    encoding='UTF-8',  # Character encoding used to read the file
    quote="'",  # Defines single quote as the string quoting character
    comment='-',  # Lines starting with '-' are treated as comments and ignored
    ignoreTrailingWhiteSpace=True,  # Trims trailing whitespace from fields
    ignoreLeadingWhiteSpace=True,  # Trims leading whitespace from fields
    nullValue='na',  # Treats 'na' as a null value
    nanValue='-1',  # Treats '-1' as NaN (Not a Number)
    positiveInf='Inf',  # Treats 'Inf' as positive infinity
    dateFormat='yyyy-MM-dd',  # Format used to parse date fields
    timestampFormat='yyyy-MM-dd HH:mm:ss',  # Format used to parse timestamp fields
    maxColumns=40  # Maximum number of columns allowed in the file
)

# Show first 10 rows
df1.show(10, False)

print("[INFO] Corruputed Rows")
# Cache and filter corrupted rows
df2 = df1.cache().where("corrupted_data is not null")
df2.show(10, False)  # Display malformed rows for RCA


+---------------------------------+--------+----------+-------------------+--------+-------+---------------------------------------------------+
|symbol                           |exchange|date      |timestamp          |price   |volume |corrupted_data                                     |
+---------------------------------+--------+----------+-------------------+--------+-------+---------------------------------------------------+
|AAPL                             |NYSE    |2023-08-01|2023-08-01 09:30:00|195.25  |1200000|NULL                                               |
|GOOGL                            |NYSE    |2023-08-01|2023-08-01 09:30:00|2735.55 |850000 |NULL                                               |
|MSFT                             |NYSE    |2023-08-01|2023-08-01 09:30:00|NaN     |950000 |NULL                                               |
|TSLA                             |NYSE    |2023-08-01|2023-08-01 09:30:00|Infinity|1100000|NULL                                  

## **6. ORC & Parquet file format for Performance Optimization**

In [13]:
# Data SCHEMA MIGRATION from csv (Struct) to ORC/Parquet (serialized-binary) and load into our DATALAKE

# Data
data =""" 
stock_symbol,exchange,date,timestamp,price,volume
AAPL,NYSE,2023-08-01,2023-08-01 09:30:00,195.25,1200000
GOOGL,NYSE,2023-08-01,2023-08-01 09:30:00,2735.55,850000
MSFT,NYSE,2023-08-01,2023-08-01 09:30:00,-1,950000
TSLA,NYSE,2023-08-01,2023-08-01 09:30:00,Inf,1100000
AMZN,NYSE,na,2023-08-01 09:30:00,134.25,na
MSFK,NYSE,2023-08-01,2023-08-01 09:30,100.01,950000
INVALID_ROW_WITHOUT_PROPER_FIELDS 
"""
 
with open("/tmp/nyse_header_options2.csv", "w") as f:
    f.write(data)
  
# Define custom schema
customschema = StructType([
    StructField("symbol", StringType(), True),
    StructField("exchange", StringType(), True),
    StructField("date", DateType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("volume", IntegerType(), True),
    StructField("corrupted_data", StringType(), True)
])
 
# Read CSV with various options
df1 = spark.read.csv(
    'file:///tmp/nyse_header_options2.csv',
    sep=',',  # Column separator used in the CSV file
    header=True,  # First line of the file contains column headers
    schema=customschema,  # Custom schema to define data types and structure
    columnNameOfCorruptRecord='corrupted_data',  # Stores malformed rows in this column
    encoding='UTF-8',  # Character encoding used to read the file
    quote="'",  # Defines single quote as the string quoting character
    comment='-',  # Lines starting with '-' are treated as comments and ignored
    ignoreTrailingWhiteSpace=True,  # Trims trailing whitespace from fields
    ignoreLeadingWhiteSpace=True,  # Trims leading whitespace from fields
    nullValue='na',  # Treats 'na' as a null value
    nanValue='-1',  # Treats '-1' as NaN (Not a Number)
    positiveInf='Inf',  # Treats 'Inf' as positive infinity
    dateFormat='yyyy-MM-dd',  # Format used to parse date fields
    timestampFormat='yyyy-MM-dd HH:mm:ss',  # Format used to parse timestamp fields
    maxColumns=40  # Maximum number of columns allowed in the file
 
)
 
# Show first 10 rows
print("[INFO] Scrubbed Data")
df1.show(10, False)
 
# Cache and filter corrupted rows
df2 = df1.cache().where("corrupted_data is null")
print("[INFO] Cureated Data")
df2.show(10, False)  
 

[INFO] Scrubbed Data
+---------------------------------+--------+----------+-------------------+--------+-------+---------------------------------------------------+
|symbol                           |exchange|date      |timestamp          |price   |volume |corrupted_data                                     |
+---------------------------------+--------+----------+-------------------+--------+-------+---------------------------------------------------+
|AAPL                             |NYSE    |2023-08-01|2023-08-01 09:30:00|195.25  |1200000|NULL                                               |
|GOOGL                            |NYSE    |2023-08-01|2023-08-01 09:30:00|2735.55 |850000 |NULL                                               |
|MSFT                             |NYSE    |2023-08-01|2023-08-01 09:30:00|NaN     |950000 |NULL                                               |
|TSLA                             |NYSE    |2023-08-01|2023-08-01 09:30:00|Infinity|1100000|NULL             

25/08/05 21:45:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: stock_symbol, exchange, date, timestamp, price, volume
 Schema: symbol, exchange, date, timestamp, price, volume
Expected: symbol but found: stock_symbol
CSV file: file:///tmp/nyse_header_options2.csv
25/08/05 21:45:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: stock_symbol, exchange, date, timestamp, price, volume
 Schema: symbol, exchange, date, timestamp, price, volume
Expected: symbol but found: stock_symbol
CSV file: file:///tmp/nyse_header_options2.csv


In [14]:
# Data SCHEMA MIGRATION from csv (Struct) to ORC (serialized-binary) and load into our DATALAKE
# Write the clean data in ORC format for internal teams data querying furpose
df2.write.orc('file:///tmp/stocks_orc',mode='overwrite') #(Datalake) hdfs:///user/hduser/custorcout
df2.write.orc('file:///tmp/stocks_orc_lzo',mode='ignore',compression='lzo')
df2.write.orc('file:///tmp/stocks_orc_lzo_part',mode='overwrite',partitionBy='date')

# Reading the data from ORC
df_orc = spark.read.orc("file:///tmp/stocks_orc_lzo_part")
print("[INFO] Reading data from ORC")
df_orc.show(truncate=False)
 
df_orc_sql = spark.sql("select * from orc.`file:///tmp/stocks_orc_lzo_part` where date is not null")
print("[INFO] Reading data from ORC using SQL")
df_orc_sql.show(truncate=False)


[INFO] Reading data from ORC
+------+--------+-------------------+--------+-------+--------------+----------+
|symbol|exchange|timestamp          |price   |volume |corrupted_data|date      |
+------+--------+-------------------+--------+-------+--------------+----------+
|GOOGL |NYSE    |2023-08-01 09:30:00|2735.55 |850000 |NULL          |2023-08-01|
|MSFT  |NYSE    |2023-08-01 09:30:00|NaN     |950000 |NULL          |2023-08-01|
|TSLA  |NYSE    |2023-08-01 09:30:00|Infinity|1100000|NULL          |2023-08-01|
|AAPL  |NYSE    |2023-08-01 09:30:00|195.25  |1200000|NULL          |2023-08-01|
|AMZN  |NYSE    |2023-08-01 09:30:00|134.25  |NULL   |NULL          |NULL      |
+------+--------+-------------------+--------+-------+--------------+----------+

[INFO] Reading data from ORC using SQL
+------+--------+-------------------+--------+-------+--------------+----------+
|symbol|exchange|timestamp          |price   |volume |corrupted_data|date      |
+------+--------+-------------------+---

In [15]:
# Data SCHEMA MIGRATION from csv (Struct) to PARQUET (serialized-binary) and load into our DATALAKE
# Write the clean data in ORC format for internal teams data querying furpose
df2.write.parquet('file:///tmp/stocks_parquet',mode='overwrite') #(Datalake) hdfs:///user/hduser/custorcout
df2.write.parquet('file:///tmp/stocks_parquet_lzo',mode='ignore',compression='snappy')
df2.write.parquet('file:///tmp/stocks_parquet_part',mode='overwrite',partitionBy='date')
 
# Reading the data from Parquet
df_orc = spark.read.parquet("file:///tmp/stocks_parquet_part")
print("[INFO] Reading data from Parquet")
df_orc.show(truncate=False)
 
df_orc_sql = spark.sql("select * from parquet.`file:///tmp/stocks_parquet_part` where date is not null")
print("[INFO] Reading data from Parquet using SQL")
df_orc_sql.show(truncate=False)


[INFO] Reading data from Parquet
+------+--------+-------------------+--------+-------+--------------+----------+
|symbol|exchange|timestamp          |price   |volume |corrupted_data|date      |
+------+--------+-------------------+--------+-------+--------------+----------+
|GOOGL |NYSE    |2023-08-01 09:30:00|2735.55 |850000 |NULL          |2023-08-01|
|MSFT  |NYSE    |2023-08-01 09:30:00|NaN     |950000 |NULL          |2023-08-01|
|TSLA  |NYSE    |2023-08-01 09:30:00|Infinity|1100000|NULL          |2023-08-01|
|AAPL  |NYSE    |2023-08-01 09:30:00|195.25  |1200000|NULL          |2023-08-01|
|AMZN  |NYSE    |2023-08-01 09:30:00|134.25  |NULL   |NULL          |NULL      |
+------+--------+-------------------+--------+-------+--------------+----------+

[INFO] Reading data from Parquet using SQL
+------+--------+-------------------+--------+-------+--------------+----------+
|symbol|exchange|timestamp          |price   |volume |corrupted_data|date      |
+------+--------+---------------

## **7. PySpark and Hive Integration : Data Ingestion and Table Creation**

In [16]:
# This script demonstrates various methods for writing data from a PySpark
# DataFrame into Hive tables, highlighting best practices, limitations,
# and common use cases.

In [17]:
from pyspark.sql.types import ShortType

# Placeholder for the data and schema
cust_schema = StructType([
    StructField('cid', IntegerType(), nullable=False),
    StructField('fname', StringType()),
    StructField('lname', StringType()),
    StructField('age', ShortType()),
    StructField('profession', StringType())
])

df1 = spark.read.csv(
     'file:///home/hduser/custinfo.csv',
     schema=cust_schema,
     header=False,
     sep=',',
     mode='dropmalformed'
)

df1.show()

+-------+--------+----------+---+--------------------+
|    cid|   fname|     lname|age|          profession|
+-------+--------+----------+---+--------------------+
|4000001|Kristina|     Chung| 55|               Pilot|
|4000002|   Paige|      Chen| 77|             Teacher|
|4000003|  Sherri|    Melton| 34|         Firefighter|
|4000004|Gretchen|      Hill| 66|Computer hardware...|
|4000005|   Karen|   Puckett| 74|              Lawyer|
|4000006| Patrick|      Song| 42|        Veterinarian|
|4000007|   Elsie|  Hamilton| 43|               Pilot|
|4000008|   Hazel|    Bender| 63|           Carpenter|
|4000009| Malcolm|    Wagner| 39|              Artist|
|4000010| Dolores|McLaughlin| 60|              Writer|
|4000011| Francis|  McNamara| 47|           Therapist|
|4000012|   Sandy|    Raynor| 26|              Writer|
|4000013|  Marion|      Moon| 41|           Carpenter|
|4000014|    Beth|   Woodard| 65|                NULL|
|4000015|   Julia|     Desai| 49|            Musician|
|4000016| 

In [None]:
# ==============================================================================
# Method 2: Using the `insertInto` function
#
# This method loads data into an existing table. It's less common for initial
# table creation and data loading, as it does not create a new table schema.
# ==============================================================================
print("--- Method 2: Inserting data into an existing table using insertInto ---")
# This requires the 'default.customers' table to already exist.
# The schema of the DataFrame must match the table schema.
# df1.write.insertInto('wholesale.customers', overwrite=True)

In [None]:
# ==============================================================================
# Method 3: Storing as CSV (PySpark only)
#
# This method creates a table with data stored in CSV format. This table is
# typically only accessible and readable via PySpark, not directly via HiveQL.
# The SerDe (Serializer/Deserializer) for Spark-written CSVs is not
# compatible with Hive's default TextFile SerDe.
# ==============================================================================
print("--- Method 3: Creating a CSV table (PySpark-only access) ---")
# Creates a managed table with data stored as CSV files.
# Hive CLI will not be able to read this table correctly.
df1.write.saveAsTable(
    'default.customers_csv',
    format='csv',
    sep=',',
    mode='overwrite'
)

In [None]:
# ==============================================================================
# Method 4: Using Hive Literal Syntax
#
# This approach uses direct HiveQL statements to create a table that is
# fully compatible with both Hive and PySpark. It is the proper way to
# meet a requirement for a TextFile table with a specific delimiter.
# ==============================================================================
print("--- Method 4: Creating a TextFile table using HiveQL (interoperable) ---")
# Step 1: Create the table using HiveQL with the specified row format.
spark.sql("""
   CREATE TABLE default.customers_text (
        id INT,
        fname STRING,
        lname STRING,
        age INT,
        prof STRING
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
""")

# Step 2: Load the data into the newly created table.
# Note: 'local' means the file is on the driver's local filesystem.

spark.sql(
    "LOAD DATA LOCAL INPATH 'file:///home/hduser/custinfo.csv' OVERWRITE INTO TABLE default.customers_text"
)


In [None]:
# ==============================================================================
# Method 5: Marrying DataFrame to a Hive Table using a View and Insert Select
#
# This is a common pattern to load data from a DataFrame into an existing
# Hive table, providing full interoperability.
# ==============================================================================
print("--- Method 5: Using Insert Select from a temporary view ---")
# Step 1: Create a temporary view from the DataFrame.
df1.createOrReplaceTempView("view1")

# Step 2: Insert data from the view into an existing Hive table.
# Assumes 'default.customers_text' exists and has a compatible schema.
spark.sql("INSERT OVERWRITE TABLE default.customers_csv SELECT * FROM view1")

In [None]:
# ==============================================================================
# Creating External Tables
#
# Demonstrates creating external tables, where the data is not managed by Hive,
# using both non-partitioned and partitioned approaches.
# ==============================================================================
print("--- Creating a non-partitioned external table ---")
spark.sql("""
    CREATE EXTERNAL TABLE default.customers_text_ext (
        id INT,
        fname STRING,
        lname STRING,
        age INT,
        prof STRING
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE 
    LOCATION '/user/hduser/customer_ext_table'
""")
# Loading data into the external table.
spark.sql(
    "LOAD DATA LOCAL INPATH 'file:///home/hduser/custinfo.csv' OVERWRITE INTO TABLE default.customers_text_ext"
)

print("--- Creating a partitioned external table ---")
spark.sql("""
    CREATE EXTERNAL TABLE default.customers_text_ext_part (
        id INT,
        fname STRING,
        lname STRING,
        age INT
    )
    PARTITIONED BY (prof STRING)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE 
    LOCATION '/user/hduser/customer_ext_table_part'
""")

# Note: `LOAD DATA` does not support dynamic partitioning.
# Instead, we use `INSERT OVERWRITE` with dynamic partitioning enabled.
spark.sql("SET spark.sql.sources.partitionOverwriteMode=dynamic") # Use Spark setting for partition overwrite mode
spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")

# Insert data dynamically into the partitioned table.
spark.sql(
    "INSERT OVERWRITE TABLE default.customers_text_ext_part PARTITION(prof) SELECT id, fname, lname, age, prof FROM view1"
)

In [None]:
# ==============================================================================
# Spark and Hive Bucketing Incompatibility
#
# This is a critical point about a known limitation. Spark and Hive use
# different hashing algorithms for bucketing, making them incompatible.
# ==============================================================================
print("--- Demonstrating Spark-Hive Bucketing Incompatibility ---")
# Drop the table if it already exists
spark.sql("DROP TABLE IF EXISTS default.customers_text_ext_part_bucket")

# Create a bucketed table using HiveQL
spark.sql("""
    CREATE EXTERNAL TABLE default.customers_text_ext_part_bucket (
        id INT,
        fname STRING,
        lname STRING,
        age INT,
        prof STRING
    )
    CLUSTERED BY (id) INTO 10 BUCKETS
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE 
    LOCATION '/user/hduser/customer_ext_table_part_bucket'
""")

# Attempting to load data from a DataFrame into this table will fail
# because Spark's bucketing algorithm is not compatible with Hive's.
# The following line will raise an AnalysisException.
# spark.sql("INSERT OVERWRITE TABLE wholesale.customers_text_ext_part_bucket SELECT * FROM view1")


In [None]:
# ==============================================================================
# Solution for High Performance: Combining Partitioning, Bucketing, and Columnar Format
#
# This code shows how to create a highly optimized table entirely within
# PySpark, which is best for Spark-native queries.
# Default format is Parquet to saving the data and Snappy codec for compression.
# ==============================================================================
print("--- Creating a highly performant table using PySpark native methods ---")
df1.write.bucketBy(10, 'cid').sortBy("cid").\
    saveAsTable(
        'default.customers_part_buck_parquet_snappy',
        mode='overwrite',
        partitionBy='profession'
    )

print("--- Conclusion on PySpark and Hive Integration ---")
print("PySpark and Hive have strong integration, but it's crucial to understand")
print("the differences in their internal implementations, especially concerning")
print("file formats and bucketing algorithms, to ensure interoperability and performance.")