**1. Extract data from a single file and from a directory of files**

In [0]:
dbutils.fs.ls("/FileStore/tables/")

Out[1]: [FileInfo(path='dbfs:/FileStore/tables/Customer_Master.csv', name='Customer_Master.csv', size=124, modificationTime=1745764108000),
 FileInfo(path='dbfs:/FileStore/tables/Sales_Transactions-1.csv', name='Sales_Transactions-1.csv', size=172, modificationTime=1745757656000),
 FileInfo(path='dbfs:/FileStore/tables/Sales_Transactions-2.csv', name='Sales_Transactions-2.csv', size=172, modificationTime=1745764108000),
 FileInfo(path='dbfs:/FileStore/tables/Sales_Transactions.csv', name='Sales_Transactions.csv', size=172, modificationTime=1745757577000),
 FileInfo(path='dbfs:/FileStore/tables/employees_with_bad_records.csv', name='employees_with_bad_records.csv', size=401, modificationTime=1746024104000),
 FileInfo(path='dbfs:/FileStore/tables/new/', name='new/', size=0, modificationTime=1745765163000),
 FileInfo(path='dbfs:/FileStore/tables/sample_json_file.json', name='sample_json_file.json', size=147, modificationTime=1746023417000)]

Load single file CSV

In [0]:
df= spark.read.csv("/FileStore/tables/Customer_Master.csv",header=True,inferSchema=True)

In [0]:
df2=spark.read.csv("/FileStore/tables/*.csv",header=True,inferSchema=True)

In [0]:
df.show()


+----------+-----------+-----------+-------+
|CustomerID|       Name|    Segment|Country|
+----------+-----------+-----------+-------+
|      C101| John Smith|   Consumer|    USA|
|      C102|   Jane Doe|  Corporate| Canada|
|      C103|Alice Brown|Home Office|     UK|
+----------+-----------+-----------+-------+



In [0]:
df2.show()

+----------+----------+--------+-----------+----------+-------------+-----------+
|EmployeeID| FirstName|LastName|        Age|Department|       Salary|ExtraColumn|
+----------+----------+--------+-----------+----------+-------------+-----------+
|       101|      John|     Doe|         29|        IT|        55000|       null|
|       102|      Jane|   Smith|         34|        HR|        62000|       null|
|       103|       Bob|    null|         28|   Finance|        51000|       null|
|       104|     Linda|   White|twenty-nine|        IT|        58000|       null|
|       105|     James|    Bond|         35|Operations|         null|       null|
|       106|     Sarah|  Connor|         31|        HR|        60000|       null|
|       107|       Tom|   Brady|         44|     Sales|not_available|       null|
|       108|     Alice|   Brown|         27|      null|         null|       null|
|       109|      null|Williams|         30|   Finance|        54000|       null|
|       110|    

Read single Json or multile JSON Files

In [0]:
dbutils.fs.ls("/FileStore/tables/")

Out[9]: [FileInfo(path='dbfs:/FileStore/tables/Customer_Master.csv', name='Customer_Master.csv', size=124, modificationTime=1745764108000),
 FileInfo(path='dbfs:/FileStore/tables/Sales_Transactions-1.csv', name='Sales_Transactions-1.csv', size=172, modificationTime=1745757656000),
 FileInfo(path='dbfs:/FileStore/tables/Sales_Transactions-2.csv', name='Sales_Transactions-2.csv', size=172, modificationTime=1745764108000),
 FileInfo(path='dbfs:/FileStore/tables/Sales_Transactions.csv', name='Sales_Transactions.csv', size=172, modificationTime=1745757577000),
 FileInfo(path='dbfs:/FileStore/tables/employees_with_bad_records.csv', name='employees_with_bad_records.csv', size=401, modificationTime=1746024104000),
 FileInfo(path='dbfs:/FileStore/tables/new/', name='new/', size=0, modificationTime=1745765163000),
 FileInfo(path='dbfs:/FileStore/tables/sample_json_file.json', name='sample_json_file.json', size=147, modificationTime=1746023417000),
 FileInfo(path='dbfs:/FileStore/tables/sample_pa

In [0]:
%python
dbutils.fs.head("/FileStore/tables/sample_patients.json")

Out[13]: '[\n  {\n    "PatientID": "P001",\n    "FirstName": "John",\n    "LastName": "Doe",\n    "DateOfBirth": "1985-06-15",\n    "Gender": "Male",\n    "PhoneNumber": "555-0101",\n    "Address": "123 Main St, Springfield"\n  },\n  {\n    "PatientID": "P002",\n    "FirstName": "Jane",\n    "LastName": "Smith",\n    "DateOfBirth": "1990-09-22",\n    "Gender": "Female",\n    "PhoneNumber": "555-0102",\n    "Address": "456 Oak Ave, Rivertown"\n  },\n  {\n    "PatientID": "P003",\n    "FirstName": "Alex",\n    "LastName": "Johnson",\n    "DateOfBirth": "1978-03-10",\n    "Gender": "Non-Binary",\n    "PhoneNumber": "555-0103",\n    "Address": "789 Pine Rd, Hillcity"\n  }\n]'

In [0]:
df3=spark.read.option("multiline","true").option("inferschema","true").json("/FileStore/tables/sample_patients.json")


In [0]:
df3.display()

Address,DateOfBirth,FirstName,Gender,LastName,PatientID,PhoneNumber
"123 Main St, Springfield",1985-06-15,John,Male,Doe,P001,555-0101
"456 Oak Ave, Rivertown",1990-09-22,Jane,Female,Smith,P002,555-0102
"789 Pine Rd, Hillcity",1978-03-10,Alex,Non-Binary,Johnson,P003,555-0103


In [0]:
df4=spark.read.option("multiline","true").option("inferschema","true").json("/FileStore/tables/*.json")

In [0]:
df4.display()

Address,Boolean,DateOfBirth,FirstName,Gender,LastName,Mobile,Name,PatientID,Pets,PhoneNumber
"123 Main St, Springfield",,1985-06-15,John,Male,Doe,,,P001,,555-0101
"456 Oak Ave, Rivertown",,1990-09-22,Jane,Female,Smith,,,P002,,555-0102
"789 Pine Rd, Hillcity",,1978-03-10,Alex,Non-Binary,Johnson,,,P003,,555-0103
"{""Permanent address"":""USA"",""current Address"":""AU""}",True,,,,,12345678.0,Test,,"List(Dog, cat)",


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType

def flatten_df(nested_df):
    """
    Recursively flattens a nested DataFrame.
    """
    flat_cols = []
    nested_cols = []

    for column_name, dtype in nested_df.dtypes:
        if dtype.startswith('struct'):
            nested_cols.append(column_name)
        elif dtype.startswith('array<struct'):
            # You can choose to explode here if needed
            nested_cols.append(column_name)
        else:
            flat_cols.append(column_name)

    # Select flat columns as is
    flat_df = nested_df.select(flat_cols + [
        col(nc + '.' + subcol).alias(nc + '_' + subcol)
        for nc in nested_cols
        for subcol in nested_df.select(nc + '.*').columns
    ])

    # Repeat until all fields are flat
    if any([t.startswith('struct') or t.startswith('array<struct') for _, t in flat_df.dtypes]):
        return flatten_df(flat_df)
    else:
        return flat_df

In [0]:
df5=flatten_df(df4)
df5.display()


Address,Boolean,DateOfBirth,FirstName,Gender,LastName,Mobile,Name,PatientID,Pets,PhoneNumber
"123 Main St, Springfield",,1985-06-15,John,Male,Doe,,,P001,,555-0101
"456 Oak Ave, Rivertown",,1990-09-22,Jane,Female,Smith,,,P002,,555-0102
"789 Pine Rd, Hillcity",,1978-03-10,Alex,Non-Binary,Johnson,,,P003,,555-0103
"{""Permanent address"":""USA"",""current Address"":""AU""}",True,,,,,12345678.0,Test,,"List(Dog, cat)",


**Identify the prefix included after the FROM keyword as the data type.**

Ah, I see what you're asking now — you're referring to how **PySpark infers the schema (data types)** when reading JSON or other structured data, particularly with **nested structures** like structs and arrays.

In PySpark, when you load a JSON file or create a DataFrame, **the structure following `FROM` in a SQL query** is actually the **table or view name**, and the "prefix" seen in field names like `address.city` refers to a **struct data type**.

### ✅ Example:

Suppose you have this JSON data:

```json
{
  "name": "Alice",
  "address": {
    "city": "New York",
    "zip": "10001"
  }
}
```

When loaded into PySpark:

```python
df = spark.read.json("data.json")
df.printSchema()
```

You get:

```
root
 |-- name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zip: string (nullable = true)
```

### 🔍 Now in SQL:

If you register this DataFrame as a view:

```python
df.createOrReplaceTempView("people")
spark.sql("SELECT address.city FROM people").show()
```

Here:

* `address` is the **prefix**.
* It corresponds to the **field of type `struct`**.
* `city` is a **nested field** inside the struct.

### ✅ So, to directly answer:

> **The prefix after the `FROM` keyword (e.g., `address` in `address.city`) in PySpark represents a field of type `struct` (i.e., a nested data type).**

Would you like code to flatten this `struct` field into top-level columns?


**Identify that tables from external sources are not Delta Lake tables.** 

In [0]:
tables = spark.catalog.listTables()
for table in tables:
    print(f"Name: {table.name}, Type: {table.tableType}, Is Temporary: {table.isTemporary}")

Name: customer_master_csv, Type: MANAGED, Is Temporary: False
Name: sales_data_sample, Type: MANAGED, Is Temporary: False
Name: sales_transactions_1_csv, Type: MANAGED, Is Temporary: False
Name: sample_json_file_json, Type: EXTERNAL, Is Temporary: False


In [0]:
display(spark.sql("DESCRIBE FORMATTED customer_master_csv"))

col_name,data_type,comment
CustomerID,string,
Name,string,
Segment,string,
Country,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,default,
Table,customer_master_csv,
Created Time,Sun Apr 27 14:29:52 UTC 2025,


In [0]:
from delta.tables import DeltaTable

is_delta = DeltaTable.isDeltaTable(spark, "/user/hive/warehouse/customer_master_csv")
print("Delta table?" , is_delta)

Delta table? True


In [0]:
from delta.tables import DeltaTable

display(DeltaTable.isDeltaTable(spark,"dbfs:/user/hive/warehouse/customer_master_csv"))

True

**Create a table from a JDBC connection and from an external CSV file**

In [0]:
%sql
/*jdbc_url = "jdbc:mysql://your-host:3306/your_database"
df_jdbc = spark.read.jdbc(
    url=jdbc_url,
    table="your_table_name",
    properties=connection_props
*/

In [0]:
#vijaykanna.database.windows.net
#vijaykanna
#101
#Vijay@272
#jdbc:sqlserver://vijaykanna.database.windows.net:1433;database=101;user=vijaykanna@vijaykanna;password={your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;

In [0]:
%python
jdbc_url = "jdbc:sqlserver://vijaykanna.database.windows.net:1433;database=101;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30"

# Retrieve the username and password from Databricks secrets
#username = dbutils.secrets.get(scope="my-scope", key="db-username")
#password = dbutils.secrets.get(scope="my-scope", key="db-password")

# Construct the JDBC URL with the retrieved credentials
jdbc_url_with_credentials = f"{jdbc_url};user=vijaykanna;password=Kanna@272"

df_jdbc = spark.read.jdbc(
    url=jdbc_url_with_credentials,
    table="Appointments"
)

display(df_jdbc)

AppointmentID,PatientID,DoctorID,AppointmentDate,Reason,Status
1,1,1,2025-04-10T10:00:00.000+0000,Chest pain,Completed
2,2,2,2025-04-11T11:00:00.000+0000,Fever and cough,Scheduled


In [0]:
df_jdbc.write.saveAsTable("FirstDeltaTable")

In [0]:
display(spark.table("FirstDeltaTable"))

AppointmentID,PatientID,DoctorID,AppointmentDate,Reason,Status
1,1,1,2025-04-10T10:00:00.000+0000,Chest pain,Completed
2,2,2,2025-04-11T11:00:00.000+0000,Fever and cough,Scheduled


In [0]:
csv=spark.read.csv("/FileStore/tables/Customer_Master.csv",header="true",inferSchema="true")

In [0]:
csv.write.saveAsTable("csvtable")

In [0]:
display(spark.table("csvtable"))

CustomerID,Name,Segment,Country
C101,John Smith,Consumer,USA
C102,Jane Doe,Corporate,Canada
C103,Alice Brown,Home Office,UK


Create a view, a temporary view, and a CTE as a reference to a file 

In [0]:
csv.createOrReplaceTempView("Tempview")
display(spark.table("Tempview"))

CustomerID,Name,Segment,Country
C101,John Smith,Consumer,USA
C102,Jane Doe,Corporate,Canada
C103,Alice Brown,Home Office,UK


In [0]:
csv.createGlobalTempView("hh")
csv.createTempView("hhh")

In [0]:
spark.sql("select * from global_temp.hh").display()

CustomerID,Name,Segment,Country
C101,John Smith,Consumer,USA
C102,Jane Doe,Corporate,Canada
C103,Alice Brown,Home Office,UK


In [0]:
spark.sql("select * from hhh").display()

CustomerID,Name,Segment,Country
C101,John Smith,Consumer,USA
C102,Jane Doe,Corporate,Canada
C103,Alice Brown,Home Office,UK


Identify how the count_if function and the count where x is null can be used 

In [0]:
condition=spark.sql("select * from hhh")

In [0]:
d10=condition.filter(col("Name").isNull()) 

d11=condition.filter(col("Name").isNotNull())

In [0]:
d10.display()
d11.display()
display

CustomerID,Name,Segment,Country


CustomerID,Name,Segment,Country
C101,John Smith,Consumer,USA
C102,Jane Doe,Corporate,Canada
C103,Alice Brown,Home Office,UK


1. count_if Function in PySpark
In PySpark SQL, count_if can be used to count rows based on a condition, similar to how it's used in standard SQL. The function is available through Spark SQL.

Syntax:
sql
Copy
Edit
count_if(condition)
Example: Count Rows Based on Condition Using count_if
python
Copy
Edit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create Spark session
spark = SparkSession.builder.appName("count_if_example").getOrCreate()

# Sample DataFrame
data = [("John", 50000), ("Jane", None), ("Sam", 60000), ("Doe", None)]
columns = ["name", "salary"]
df = spark.createDataFrame(data, columns)

# Register the DataFrame as a temporary view to use SQL
df.createOrReplaceTempView("employees")

# Use SQL to count rows where salary > 50000 (using count_if)
result = spark.sql("SELECT count_if(salary > 50000) AS high_salary_count FROM employees")
result.show()

# Use SQL to count rows where salary is NULL (using count_if)
result_null = spark.sql("SELECT count_if(salary IS NULL) AS null_salary_count FROM employees")
result_null.show()
2. COUNT Where x is NULL in PySpark
If you want to count the number of rows where a specific column (x) is NULL, you can use the count function in combination with when and isNull() in PySpark, or directly use SQL.

a) Using SQL (COUNT with WHERE x IS NULL):
python
Copy
Edit
# Count rows where salary is NULL using SQL
result_null_sql = spark.sql("SELECT COUNT(*) AS null_salary_count FROM employees WHERE salary IS NULL")
result_null_sql.show()
b) Using DataFrame API (count with when and isNull):
python
Copy
Edit
from pyspark.sql.functions import when

# Count rows where salary is NULL using DataFrame API (with `when` and `isNull`)
null_count = df.select(when(col("salary").isNull(), 1).otherwise(None)).count()
print(f"Number of NULL salary rows: {null_count}")
Full Example in PySpark:
Here is the complete example of both methods:

python
Copy
Edit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when

# Create Spark session
spark = SparkSession.builder.appName("count_if_example").getOrCreate()

# Sample DataFrame
data = [("John", 50000), ("Jane", None), ("Sam", 60000), ("Doe", None)]
columns = ["name", "salary"]
df = spark.createDataFrame(data, columns)

# Register the DataFrame as a temporary view to use SQL
df.createOrReplaceTempView("employees")

# 1. Using `count_if` to count rows with salary > 50000 in Spark SQL
result = spark.sql("SELECT count_if(salary > 50000) AS high_salary_count FROM employees")
result.show()

# 2. Using `count_if` to count rows where salary is NULL in Spark SQL
result_null = spark.sql("SELECT count_if(salary IS NULL) AS null_salary_count FROM employees")
result_null.show()

# 3. Using `COUNT` with `WHERE salary IS NULL` in Spark SQL
result_null_sql = spark.sql("SELECT COUNT(*) AS null_salary_count FROM employees WHERE salary IS NULL")
result_null_sql.show()

# 4. Using DataFrame API with `when` and `isNull` to count NULL values
null_count = df.select(when(col("salary").isNull(), 1).otherwise(None)).count()
print(f"Number of NULL salary rows: {null_count}")
Explanation:
count_if in SQL:

count_if(salary > 50000): Counts the rows where the salary is greater than 50,000.

count_if(salary IS NULL): Counts the rows where the salary is NULL.

Using COUNT with WHERE x IS NULL:

This counts the rows where salary is NULL by using a WHERE clause.

Using DataFrame API with when and isNull:

The when function evaluates whether salary is NULL, and the count() function counts the number of NULL values.

Summary:
count_if: Directly counts rows based on a condition, and is available through Spark SQL (count_if(condition)).

COUNT WHERE x IS NULL: Count rows where a column is NULL using either a WHERE clause in SQL or using when and isNull() in the DataFrame API.

Both approaches can be used effectively in PySpark depending on whether you're working with SQL queries or DataFrame transformations. Let me know if you need any further clarification!

Identify how the count(row) skips NULL values.

In [0]:
from pyspark.sql.functions import count
d10.select("Name").count()         
# Outputs: 3 (includes nulls in column but counts all rows)
d11.select(count("Name")).show()  

+-----------+
|count(Name)|
+-----------+
|          3|
+-----------+



Deduplicate rows from an existing Delta Lake table. 

To deduplicate rows from an existing Delta Lake table in Databricks / PySpark, you can use either:

dropDuplicates() (DataFrame API)

ROW_NUMBER() with WINDOW functions (for fine control over which duplicate to keep)

✅ Method 1: Using dropDuplicates() (Quick & Easy)
Example:
python
Copy
Edit
# Read the Delta table
df = spark.read.format("delta").load("/delta/events")

# Drop duplicate rows based on all columns (full row duplicates)
deduped_df = df.dropDuplicates()

# OR: Drop duplicates based on specific columns (e.g., id and timestamp)
deduped_df = df.dropDuplicates(["id", "timestamp"])

# Overwrite the table or save elsewhere
deduped_df.write.format("delta").mode("overwrite").save("/delta/events_deduped")
🔁 Use "overwrite" mode carefully — it replaces data in the target path or table.

✅ Method 2: Using ROW_NUMBER() to Keep Latest/First Record
Use this when you have multiple rows per ID and want to keep only the latest (or first) based on a timestamp.

Example:
python
Copy
Edit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Read the Delta table
df = spark.read.format("delta").load("/delta/events")

# Define a window to partition by ID and order by timestamp descending
window_spec = Window.partitionBy("id").orderBy(col("timestamp").desc())

# Add row numbers to identify duplicates
df_with_rownum = df.withColumn("row_num", row_number().over(window_spec))

# Filter to keep only the first row (latest timestamp)
deduped_df = df_with_rownum.filter(col("row_num") == 1).drop("row_num")

# Save or overwrite the deduplicated data
deduped_df.write.format("delta").mode("overwrite").save("/delta/events_deduped")
🔄 Optional: Overwrite the Existing Table
If you want to overwrite the existing Delta table directly:

python
Copy
Edit
deduped_df.write.format("delta").mode("overwrite").save("/delta/events")
Or, using a table name:

python
Copy
Edit
deduped_df.write.format("delta").mode("overwrite").saveAsTable("events")


In [0]:
dataga=spark.sql("select * from csvtable")

In [0]:
display(dataga.dropDuplicates())

CustomerID,Name,Segment,Country
C102,Jane Doe,Corporate,Canada
C101,John Smith,Consumer,USA
C103,Alice Brown,Home Office,UK


In [0]:
display(dataga.dropDuplicates(["name"]))

CustomerID,Name,Segment,Country
C103,Alice Brown,Home Office,UK
C102,Jane Doe,Corporate,Canada
C101,John Smith,Consumer,USA


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number

# Define the window specification
window1 = Window.partitionBy("Name").orderBy(col("country").desc())

# Apply row_number over the window
rank = dataga.withColumn("rank", row_number().over(window1))

In [0]:
rank2=rank.filter(col("rank")==1)

In [0]:
rank2.display()

CustomerID,Name,Segment,Country,rank
C103,Alice Brown,Home Office,UK,1
C102,Jane Doe,Corporate,Canada,1
C101,John Smith,Consumer,USA,1


Create a new table from an existing table while removing duplicate rows. 

# Read from the existing table
df = spark.table("sales_data")

# Drop duplicates based on all columns
df_deduped = df.dropDuplicates()

# Write to a new table
df_deduped.write.format("delta").mode("overwrite").saveAsTable("sales_data_deduplicated")


In [0]:
new=spark.table("csvtable")

dfdrop=new.dropDuplicates()

dfdrop.write.format("delta").mode("overwrite").saveAsTable("csvtable2")

In [0]:
%sql 
select * from csvtable2

CustomerID,Name,Segment,Country
C102,Jane Doe,Corporate,Canada
C101,John Smith,Consumer,USA
C103,Alice Brown,Home Office,UK


from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

# Define window spec
window_spec = Window.partitionBy("id", "date").orderBy(col("timestamp").desc())

# Assign row numbers
df_ranked = df.withColumn("row_num", row_number().over(window_spec))

# Keep only the first row per group
df_deduped = df_ranked.filter(col("row_num") == 1).drop("row_num")

df_deduped.show()


Validate that the primary key is unique across all rows.

In [0]:
from pyspark.sql.functions import col
#from pyspark.sql.functions import groupBy
new.groupBy("Name").count().filter(col("Name")==1).display()

Name,count


Validate that a field is associated with just one unique value in another field. 

In [0]:
from pyspark.sql.functions import countDistinct

# Check how many unique department values each employee_id has
violations = new.groupBy("Name").agg(countDistinct("Name").alias("dept_count")) \
               .filter("dept_count > 1")

violations.show()

+----+----------+
|Name|dept_count|
+----+----------+
+----+----------+



Validate that a value is not present in a specific field. 


In [0]:
import pyspark.sql.functions as F

new.filter(col("Name").isNotNull()).display()

CustomerID,Name,Segment,Country
C101,John Smith,Consumer,USA
C102,Jane Doe,Corporate,Canada
C103,Alice Brown,Home Office,UK


In [0]:
%sql
/*from pyspark.sql.functions import col

# Assume 'date_str' is a string column like "2025-05-03 12:34:56"
df_with_timestamp = df.withColumn("date_ts", col("date_str").cast("timestamp"))

df_with_timestamp.show()

from pyspark.sql.functions import to_timestamp

# Example: custom format "dd/MM/yyyy HH:mm"
df_with_ts = df.withColumn("date_ts", to_timestamp(col("date_str"), "dd/MM/yyyy HH:mm"))

df_with_ts.show() */


In [0]:
addcoulm = spark.table("csvtable")

In [0]:

from pyspark.sql.functions import current_timestamp
add=addcoulm.withColumn("current_timestamp", current_timestamp())
add.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("csvtable")


In [0]:
%sql
select * from csvtable


CustomerID,Name,Segment,Country,current_timestamp
C101,John Smith,Consumer,USA,2025-05-03T06:39:10.853+0000
C102,Jane Doe,Corporate,Canada,2025-05-03T06:39:10.853+0000
C103,Alice Brown,Home Office,UK,2025-05-03T06:39:10.853+0000


In [0]:
from pyspark.sql.functions import col, cast, to_timestamp

d111=spark.table("csvtable")

d121=d111.withColumn("New_stamp", col("current_timestamp").cast("timestamp"))
df_with_ts = df.withColumn("date_ts", to_timestamp(col("current_timestamp"), "dd/MM/yyyy"))

In [0]:


d121.display()
df_with_ts.display()

CustomerID,Name,Segment,Country,current_timestamp,New_stamp
C101,John Smith,Consumer,USA,2025-05-03T06:39:10.853+0000,2025-05-03T06:39:10.853+0000
C102,Jane Doe,Corporate,Canada,2025-05-03T06:39:10.853+0000,2025-05-03T06:39:10.853+0000
C103,Alice Brown,Home Office,UK,2025-05-03T06:39:10.853+0000,2025-05-03T06:39:10.853+0000


CustomerID,Name,Segment,Country,date_ts
C101,John Smith,Consumer,USA,2025-05-03T06:45:02.085+0000
C102,Jane Doe,Corporate,Canada,2025-05-03T06:45:02.085+0000
C103,Alice Brown,Home Office,UK,2025-05-03T06:45:02.085+0000


Extract a specific pattern from an existing string column.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark Session
spark = SparkSession.builder.appName("EmailDataframe").getOrCreate()

# Define the schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# Create data for the DataFrame
data = [
    (1, "John Doe", "john.doe@example.com", 30, "New York"),
    (2, "Jane Smith", "jane.smith@example.com", 25, "Los Angeles"),
    (3, "Peter Jones", "peter.jones@example.com", 40, "Chicago"),
    (4, "Mary Brown", "mary.brown@example.com", 35, "Houston")
]

# Create the DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame
df.show()

# Print the schema of the DataFrame
df.printSchema()

+---+-----------+--------------------+---+-----------+
| id|       name|               email|age|       city|
+---+-----------+--------------------+---+-----------+
|  1|   John Doe|john.doe@example.com| 30|   New York|
|  2| Jane Smith|jane.smith@exampl...| 25|Los Angeles|
|  3|Peter Jones|peter.jones@examp...| 40|    Chicago|
|  4| Mary Brown|mary.brown@exampl...| 35|    Houston|
+---+-----------+--------------------+---+-----------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



In [0]:
from pyspark.sql.functions import regexp_extract

df2=df.withColumn("New", regexp_extract("email", "@(\\w+\\.\\w+)", 1) )
df3=df.withColumn("New", regexp_extract("email", "\\.([a-zA-Z0-9]+)$", 1) ) 
df2.display()
df3.display()

id,name,email,age,city,New
1,John Doe,john.doe@example.com,30,New York,example.com
2,Jane Smith,jane.smith@example.com,25,Los Angeles,example.com
3,Peter Jones,peter.jones@example.com,40,Chicago,example.com
4,Mary Brown,mary.brown@example.com,35,Houston,example.com


id,name,email,age,city,New
1,John Doe,john.doe@example.com,30,New York,com
2,Jane Smith,jane.smith@example.com,25,Los Angeles,com
3,Peter Jones,peter.jones@example.com,40,Chicago,com
4,Mary Brown,mary.brown@example.com,35,Houston,com


🔹 Common Pattern Examples
Use Case	Regex Pattern	Group
Extract digits	"(\\d+)"	1
Extract text inside brackets	"\\[(.*?)\\]"	1
Extract URL from string	"(https?://[^\\s]+)"	1
Extract file extension	"\\.([a-zA-Z0-9]+)$"	1
Extract year from date	"^(\\d{4})-"	1

In [0]:
Utilize the dot syntax to extract nested data fields. 

In [0]:
df = spark.read.option("multiline", "true").json("/FileStore/tables/nested_json.json")
display(df)

user
"List(101, List(New York, USA), Alice)"


In [0]:
df.select("user.id", "user.name").show()

+---+-----+
| id| name|
+---+-----+
|101|Alice|
+---+-----+



In [0]:
from pyspark.sql.functions import col

df.select(
    col("user").getField("id").alias("Member id")
).show()

+---------+
|Member id|
+---------+
|      101|
+---------+



Identify the benefits of using array functions. 


ChatGPT said:
Using array functions in PySpark provides several key benefits, especially when working with complex or semi-structured data like JSON, nested schemas, or arrays within a column.

✅ Benefits of Using Array Functions in PySpark
1. Efficient Handling of Complex Data
Easily work with nested or repeated data structures without flattening them.

Ideal for processing JSON, event logs, or hierarchical datasets.

2. Simplifies Data Transformation
Functions like explode(), array_contains(), size(), sort_array(), etc., allow intuitive operations on array-type columns.

python
Copy
Edit
from pyspark.sql.functions import explode

# Explode an array column into multiple rows
df.select(explode("tags")).show()
3. Improves Readability and Maintainability
Avoids complex joins or manual loops when processing collections.

Array functions keep the logic concise and expressive.

4. Facilitates Advanced Analytics
Perform tasks like filtering arrays, checking membership, getting max/min values in arrays, etc., without custom UDFs.

python
Copy
Edit
from pyspark.sql.functions import array_contains

# Check if 'tags' array contains 'spark'
df.filter(array_contains("tags", "spark")).show()
5. Better Performance Over UDFs
Native array functions are optimized by Catalyst, PySpark's query engine, and run faster than Python-based UDFs.

🛠 Common PySpark Array Functions
Function	Description
explode()	Converts array elements into rows
array_contains()	Checks if an array contains a value
size()	Returns the number of elements in the array
sort_array()	Sorts array elements
array()	Creates an array column
flatten()	Flattens nested arrays

In [0]:
from pyspark.sql.functions import explode

df = spark.createDataFrame([(["python", "spark", "sql"],)], ["skills"])
df2=df.select(explode("skills").alias("skill")).show()


+------+
| skill|
+------+
|python|
| spark|
|   sql|
+------+



In [0]:
from pyspark.sql.functions import array_contains

df.filter(array_contains("skills", "spark")).show()


+--------------------+
|              skills|
+--------------------+
|[python, spark, sql]|
+--------------------+



In [0]:
from pyspark.sql.functions import size

df.select(size("skills").alias("num_skills")).show()


+----------+
|num_skills|
+----------+
|         3|
+----------+



In [0]:
from pyspark.sql.functions import sort_array

df.select(sort_array("skills").alias("sorted_skills")).show()


+--------------------+
|       sorted_skills|
+--------------------+
|[python, spark, sql]|
+--------------------+



In [0]:
from pyspark.sql.functions import array

df = spark.createDataFrame([(1, "A", "B")], ["id", "col1", "col2"])
df.select(array("col1", "col2").alias("combined_array")).show()


+--------------+
|combined_array|
+--------------+
|        [A, B]|
+--------------+



In [0]:
from pyspark.sql.functions import flatten

df = spark.createDataFrame([([["a", "b"], ["c"]],)], ["nested_array"])
df.select(flatten("nested_array").alias("flat_array")).show()


+----------+
|flat_array|
+----------+
| [a, b, c]|
+----------+



In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

json_data = [
    ('{"name": "Alice", "age": 30}',),
    ('{"name": "Bob", "age": 25}',)
]

df = spark.createDataFrame(json_data, ["json_string"])
df.show(truncate=False)

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df_parsed = df.withColumn("parsed", from_json(col("json_string"), schema))
df_parsed.select("parsed.*").show()


+----------------------------+
|json_string                 |
+----------------------------+
|{"name": "Alice", "age": 30}|
|{"name": "Bob", "age": 25}  |
+----------------------------+

+-----+---+
| name|age|
+-----+---+
|Alice| 30|
|  Bob| 25|
+-----+---+



In [0]:
from pyspark.sql import SparkSession

# Initialize Spark Session (only if not already done)
spark = SparkSession.builder.appName("JoinExample").getOrCreate()

# Create df1
data1 = [(1, "Alice"), (2, "Bob"), (3, "Carol")]
columns1 = ["id", "name"]
df1 = spark.createDataFrame(data1, columns1)

# Create df2
data2 = [(1, "New York"), (3, "Chicago"), (4, "LA")]
columns2 = ["id", "city"]
df2 = spark.createDataFrame(data2, columns2)

# Show both DataFrames
df1.show()
df2.show()


+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
|  3|Carol|
+---+-----+

+---+--------+
| id|    city|
+---+--------+
|  1|New York|
|  3| Chicago|
|  4|      LA|
+---+--------+



In [0]:
join=df1.join(df2,df1.id==df2.id,"inner")
join.display()

id,name,id.1,city
1,Alice,1,New York
3,Carol,3,Chicago


In [0]:
leftjoin=df1.join(df2,df1.id==df2.id,"left")
leftjoin.display()

id,name,id.1,city
1,Alice,1.0,New York
2,Bob,,
3,Carol,3.0,Chicago


In [0]:
rightjoin=df1.join(df2,df1.id==df2.id,"right")
rightjoin.display()

id,name,id.1,city
1.0,Alice,1,New York
3.0,Carol,3,Chicago
,,4,LA


In [0]:
fulljoin=df1.join(df2,df1.id==df2.id,"full")
fulljoin.display()

id,name,id.1,city
1.0,Alice,1.0,New York
2.0,Bob,,
3.0,Carol,3.0,Chicago
,,4.0,LA


#left_semi
#left_anti

Function	Input Type	Output Type	Use Case
explode()	Array column in a row	One row per element	When you want to convert array elements into separate rows
flatten()	Array of arrays (ArrayType(ArrayType(...)))	Single array	When you want to flatten nested arrays into one array column

In [0]:
from pyspark.sql.functions import explode

data = [(1, ["red", "blue"]), (2, ["green", "yellow"])]
df = spark.createDataFrame(data, ["id", "colors"])

df_exploded = df.select("id", explode("colors").alias("color"))
df_exploded.show()

from pyspark.sql.functions import flatten

data = [([["a", "b"], ["c"]],)]
df = spark.createDataFrame(data, ["nested_array"])

df_flat = df.select(flatten("nested_array").alias("flat_array"))
df_flat.show()


+---+------+
| id| color|
+---+------+
|  1|   red|
|  1|  blue|
|  2| green|
|  2|yellow|
+---+------+

+----------+
|flat_array|
+----------+
| [a, b, c]|
+----------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Sample data
data = [
    ("East", "Jan", 100),
    ("East", "Feb", 120),
    ("West", "Jan", 90),
    ("West", "Feb", 110)
]

# Create DataFrame
df = spark.createDataFrame(data, ["region", "month", "sales"])
df.display()
# Pivot: transform months into columns
pivot=df.groupBy("region").pivot("month").agg(sum("sales"))
pivot.display()


region,month,sales
East,Jan,100
East,Feb,120
West,Jan,90
West,Feb,110


region,Feb,Jan
East,120,100
West,110,90


In [0]:
%sql
CREATE OR REPLACE FUNCTION add_prefix(name STRING)
RETURNS STRING
RETURN CONCAT('Mr./Ms. ', name);

SELECT add_prefix('Alice') AS greeting;


greeting
Mr./Ms. Alice


In [0]:
from pyspark.sql.functions import col, sum, avg

# Checking function location
print(col.__module__)  # Output: pyspark.sql.functions
print(sum.__module__)  # Output: pyspark.sql.functions


pyspark.sql.functions
pyspark.sql.functions


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define UDF
def add_prefix(name):
    return "Mr./Ms. " + name

# Register UDF
add_prefix_udf = udf(add_prefix, StringType())

# Use the UDF
df = spark.createDataFrame([("Alice",), ("Bob",)], ["name"])
df.withColumn("greeting", add_prefix_udf("name")).show()


+-----+-------------+
| name|     greeting|
+-----+-------------+
|Alice|Mr./Ms. Alice|
|  Bob|  Mr./Ms. Bob|
+-----+-------------+



In [0]:
print(add_prefix_udf.__module__)

__main__


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Create Spark session
spark = SparkSession.builder.appName("case_when_example").getOrCreate()

# Sample data
data = [(1, "Alice", 25),
        (2, "Bob", 16),
        (3, "Carol", 67)]

columns = ["id", "name", "age"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

In [0]:
from pyspark.sql.functions import when, col

df.withColumn("new", when(col("id")==1,"A").when(col("id")==2,"B").otherwise("C")).display()

id,name,age,new
1,Alice,25,A
2,Bob,16,B
3,Carol,67,C


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Create Spark session
spark = SparkSession.builder.appName("custom_control_flow").getOrCreate()

# Sample sales data
data = [(1, "Alice", 3000),
        (2, "Bob", 12000),
        (3, "Carol", 25000),
        (4, "David", 7000)]

columns = ["id", "name", "sales_amount"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Apply custom control flow using CASE/WHEN to categorize sales
df = df.withColumn(
    "sales_category",
    when(col("sales_amount") < 5000, "Low Sales")
    .when(col("sales_amount").between(5000, 20000), "Average Sales")
    .otherwise("High Sales")
)

df.show()


+---+-----+------------+--------------+
| id| name|sales_amount|sales_category|
+---+-----+------------+--------------+
|  1|Alice|        3000|     Low Sales|
|  2|  Bob|       12000| Average Sales|
|  3|Carol|       25000|    High Sales|
|  4|David|        7000| Average Sales|
+---+-----+------------+--------------+

