<a href="https://colab.research.google.com/github/mayurjainf007/PySpark_Interview_Questions/blob/notebook/_notebooks/2021-06-22-hello-pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
!pip install pyspark



In [10]:
# Initialize Spark session
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col,split

spark = SparkSession.builder.appName('Spark Playground').getOrCreate()

schema = StructType([
StructField("Emp_ID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True),
StructField("Salary", IntegerType(), True),
StructField("Department", StringType(), True),
StructField("Address", StringType(), True)
])


data = [ (101, "Rajesh", 30, 60000, "IT", "Mumbai, Maharashtra"),
(102, "Priya", 28, 75000, "HR", "Bengaluru, Karnataka"),
(103, "Suresh", 35, 50000, "Finance", "Chennai, Tamil Nadu"),
(104, "Anjali", 25, 80000, "IT", "Pune, Maharashtra"), (105, "Arjun", 40, 90000, "Management", "Hyderabad, Telangana") ]

df = spark.createDataFrame(data, schema)
display(df)

filtered_df = df.filter((col('Department')=='IT') & (col('Salary') > 70000))
df_transformed = filtered_df.withColumn('City', split(col('Address'),',')[0]) \
                            .withColumn('State', split(col('Address'),',')[1]) \
                            .drop('Address')
df_transformed.show()
df_transformed.write.mode('overwrite').parquet('output.parquet')
spark.stop()

DataFrame[Emp_ID: int, Name: string, Age: int, Salary: int, Department: string, Address: string]

+------+------+---+------+----------+----+------------+
|Emp_ID|  Name|Age|Salary|Department|City|       State|
+------+------+---+------+----------+----+------------+
|   104|Anjali| 25| 80000|        IT|Pune| Maharashtra|
+------+------+---+------+----------+----+------------+



In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, rank, avg

spark = SparkSession.builder.appName("WindowFunctionExample").getOrCreate()

# Sample data
data = [
    (1, "HR", 5000),
    (2, "HR", 6000),
    (3, "HR", 4500),
    (4, "IT", 7000),
    (5, "IT", 7200),
    (6, "IT", 7100),
    (7, "Finance", 6500),
    (8, "Finance", 6300),
]

# Create DataFrame
df = spark.createDataFrame(data, ["employee_id", "department", "salary"])

rank_window = Window.partitionBy("department").orderBy(col("salary").desc())
df_with_rank = df.withColumn("rank", rank().over(rank_window))
df_with_rank.show()

avg_salary_window = Window.partitionBy("department")
df_with_avg_salary = df.withColumn("avg_salary", avg(col("salary")).over(avg_salary_window))
df_with_avg_salary.show()

top_2_employees = df_with_rank.filter(col("rank") <= 2)
top_2_employees.show()

top_2_employees_by_dept = df_with_rank.filter(col("rank") <= 2).orderBy(col("department"), col("rank"))
top_2_employees_by_dept.show()

top_2_emplyoees_by_avg_salary = df_with_avg_salary.filter(col("salary") >= col("avg_salary"))
top_2_emplyoees_by_avg_salary.show()

+-----------+----------+------+----+
|employee_id|department|salary|rank|
+-----------+----------+------+----+
|          7|   Finance|  6500|   1|
|          8|   Finance|  6300|   2|
|          2|        HR|  6000|   1|
|          1|        HR|  5000|   2|
|          3|        HR|  4500|   3|
|          5|        IT|  7200|   1|
|          6|        IT|  7100|   2|
|          4|        IT|  7000|   3|
+-----------+----------+------+----+

+-----------+----------+------+-----------------+
|employee_id|department|salary|       avg_salary|
+-----------+----------+------+-----------------+
|          7|   Finance|  6500|           6400.0|
|          8|   Finance|  6300|           6400.0|
|          1|        HR|  5000|5166.666666666667|
|          2|        HR|  6000|5166.666666666667|
|          3|        HR|  4500|5166.666666666667|
|          4|        IT|  7000|           7100.0|
|          5|        IT|  7200|           7100.0|
|          6|        IT|  7100|           7100.0|
+----

In [17]:
df_with_window = df.withColumn("rank", rank().over(rank_window))\
                    .withColumn("avg_salary", avg(col("salary")).over(avg_salary_window))
top_employees = df_with_window.filter(col("rank") <= 2)
top_employees.show()

spark.stop()

+-----------+----------+------+----+-----------------+
|employee_id|department|salary|rank|       avg_salary|
+-----------+----------+------+----+-----------------+
|          7|   Finance|  6500|   1|           6400.0|
|          8|   Finance|  6300|   2|           6400.0|
|          2|        HR|  6000|   1|5166.666666666667|
|          1|        HR|  5000|   2|5166.666666666667|
|          5|        IT|  7200|   1|           7100.0|
|          6|        IT|  7100|   2|           7100.0|
+-----------+----------+------+----+-----------------+



In [2]:
user_ids = [101, 102, 103, 104, 105]
# Your code here
even_square_id = {id: id**2 for id in user_ids if id%2==0}
print(even_square_id)


{102: 10404, 104: 10816}


In [3]:
from functools import reduce

nums = [1, 2, 3, 4]
doubled = list(map(lambda x: x * 2, nums))         # [2, 4, 6, 8]
evens = list(filter(lambda x: x % 2 == 0, nums))   # [2, 4]
product = reduce(lambda x, y: x * y, nums)         # 24

In [4]:
def even_generator(n):
  for i in range(n):
    if i%2 == 0:
      yield i

gen = even_generator(20)
for num in gen:
    print(num)

0
2
4
6
8
10
12
14
16
18


In [5]:
def divide(a,b):
  try:
    res = a/b
  except ZeroDivisionError:
    print('Not divisible by zero')
  else:
    print(res)
  finally:
    print('Done')

divide(256,4)
divide(12,0)

64.0
Done
Not divisible by zero
Done


In [7]:
import re

text = "User123 logged in at 10:23AM"
match = re.search(r'\d{2}:\d{2}[AP]M', text)
print(match.group())  # 10:23AM

pattern = re.compile(r'\d+')
print(pattern.findall('12 drummers drumming, 11 pipers piping, 10 lords a-leaping'))


10:23AM
['12', '11', '10']


In [16]:
text = "Reach out to us at support@example.com or careers@meta.com."
emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text)
print(emails)

['support@example.com', 'careers@meta.com']


In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, split, from_unixtime

spark = SparkSession.builder.appName('Spark Playground').getOrCreate()

data = [ (1, "2025-01-31 08:00:00", "2025-01-31 10:30:45"),
(2, "2025-01-31 09:00:30", "2025-01-31 12:15:10"),
(3, "2025-01-31 07:45:00", "2025-01-31 09:00:15") ]

schema = ["user_id", "login_timestamp", "logout_timestamp"]
df = spark.createDataFrame(data, schema)


df = df.withColumn('login_timestamp', col('login_timestamp').cast('timestamp'))
df = df.withColumn('logout_timestamp', col('logout_timestamp').cast('timestamp'))
df = df.withColumn('duration_seconds', col('logout_timestamp').cast('long') - col('login_timestamp').cast('long'))
# in HH:mm:ss
df = df.withColumn('duration_hms', from_unixtime(col('duration_seconds'), 'HH:mm:ss'))

df.show()
df.printSchema()

+-------+-------------------+-------------------+----------------+------------+
|user_id|    login_timestamp|   logout_timestamp|duration_seconds|duration_hms|
+-------+-------------------+-------------------+----------------+------------+
|      1|2025-01-31 08:00:00|2025-01-31 10:30:45|            9045|    02:30:45|
|      2|2025-01-31 09:00:30|2025-01-31 12:15:10|           11680|    03:14:40|
|      3|2025-01-31 07:45:00|2025-01-31 09:00:15|            4515|    01:15:15|
+-------+-------------------+-------------------+----------------+------------+

root
 |-- user_id: long (nullable = true)
 |-- login_timestamp: timestamp (nullable = true)
 |-- logout_timestamp: timestamp (nullable = true)
 |-- duration_seconds: long (nullable = true)
 |-- duration_hms: string (nullable = true)



In [7]:
# 𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧:
# You are given a dataset containing sales data for different stores across various months. Each row contains the store name, the month, and the sales amount. Your task is to calculate the cumulative sales for each store, considering the monthly sales, using PySpark.

# You should also:
# Filter out stores with sales lower than 1000 in any month.
# Calculate the total sales for each store over all months.
# Sort the results by the total sales in descending order.

# 𝐬𝐜𝐡𝐞𝐦𝐚 𝐚𝐧𝐝 𝐝𝐚𝐭𝐚𝐬𝐞𝐭

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, split, from_unixtime, sum
from pyspark.sql.window import Window

spark = SparkSession.builder.appName('Spark Playground').getOrCreate()

data = [
    ("Store A", "2024-01", 800),
    ("Store A", "2024-02", 1200),
    ("Store A", "2024-03", 900),
    ("Store B", "2024-01", 1500),
    ("Store B", "2024-02", 1600),
    ("Store B", "2024-03", 1400),
    ("Store C", "2024-01", 700),
    ("Store C", "2024-02", 1000),
    ("Store C", "2024-03", 800)
]

df = spark.createDataFrame(data, ["Store", "Month", "Sales"])

# 1
df = df.filter(col('Sales') >= 1000)
# 2
window_spec = Window.partitionBy('Store').orderBy(col('Month'))
df = df.withColumn('total_sales', sum(col('Sales')).over(window_spec))
# 3
df = df.sort(col('total_sales').desc())

df.show()

+-------+-------+-----+-----------+
|  Store|  Month|Sales|total_sales|
+-------+-------+-----+-----------+
|Store B|2024-03| 1400|       4500|
|Store B|2024-02| 1600|       3100|
|Store B|2024-01| 1500|       1500|
|Store A|2024-02| 1200|       1200|
|Store C|2024-02| 1000|       1000|
+-------+-------+-----+-----------+



In [14]:
# 𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧
# You are given a dataset of stock prices with the following columns:

# - stock_id: Unique identifier for the stock.
# - date: The date of the stock price.
# - price: The price of the stock on the given date.

# Your task is to calculate the 3-day rolling average of the stock price (rolling_avg) for each stock (stock_id) using a sliding window,
# ensuring the results are partitioned by stock_id and ordered by date.

# 𝐬𝐜𝐡𝐞𝐦𝐚

from pyspark.sql.functions import avg, col, format_number
from pyspark.sql.window import Window

data = [
    ("A", "2023-01-01", 100), ("A", "2023-01-02", 105),
    ("A", "2023-01-03", 110), ("A", "2023-01-04", 120),
    ("B", "2023-01-01", 50), ("B", "2023-01-02", 55),
    ("B", "2023-01-03", 60), ("B", "2023-01-04", 65),
]

schema = ["stock_id", "date", "price"]

df = spark.createDataFrame(data, schema)

window_spec = Window.partitionBy('Stock_id').orderBy('date').rowsBetween(-2, 0)
df = df.withColumn('rolling_avg', avg(col('price')).over(window_spec))
# rolling_avg to be upto 2 decimal
df = df.withColumn('rolling_avg', format_number(col('rolling_avg'), 2))
df.show()

+--------+----------+-----+-----------+
|stock_id|      date|price|rolling_avg|
+--------+----------+-----+-----------+
|       A|2023-01-01|  100|     100.00|
|       A|2023-01-02|  105|     102.50|
|       A|2023-01-03|  110|     105.00|
|       A|2023-01-04|  120|     111.67|
|       B|2023-01-01|   50|      50.00|
|       B|2023-01-02|   55|      52.50|
|       B|2023-01-03|   60|      55.00|
|       B|2023-01-04|   65|      60.00|
+--------+----------+-----+-----------+



In [25]:


import requests

response = requests.get("https://jsonplaceholder.typicode.com/users")
data = response.json()

for i in data:
  print(i['name'],' : ',i['email'])

Leanne Graham  :  Sincere@april.biz
Ervin Howell  :  Shanna@melissa.tv
Clementine Bauch  :  Nathan@yesenia.net
Patricia Lebsack  :  Julianne.OConner@kory.org
Chelsey Dietrich  :  Lucio_Hettinger@annie.ca
Mrs. Dennis Schulist  :  Karley_Dach@jasper.info
Kurtis Weissnat  :  Telly.Hoeger@billy.biz
Nicholas Runolfsdottir V  :  Sherwood@rosamond.me
Glenna Reichert  :  Chaim_McDermott@dana.io
Clementina DuBuque  :  Rey.Padberg@karina.biz


In [48]:
import pandas as pd
import requests
import re

# Steps:

def fetch_and_clean_users(api_url: str) -> pd.DataFrame:

  # Fetch data from API:
  response = requests.get(api_url)
  data = response.json()

  # Clean it:
  # Extract name, email, and address.city
  cleaned_data = []
  for i in data:
    cleaned_data.append({
        'name': i['name'],
        'email': i['email'],
        'city': i['address']['city']
    })

  # Normalize names (title case), validate emails using regex
  for i in cleaned_data:
    i['name'] = i['name'].title()
    if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', i['email']):
      raise ValueError(f"Invalid email address: {i['email']}")

  # Transform to DataFrame:
  # Use pandas, handle missing values (simulate some), drop duplicates
  df = pd.DataFrame(cleaned_data)
  df = df.dropna()
  df = df.drop_duplicates()


  # Save to CSV locally:
  # Call it cleaned_users.csv
  pd.DataFrame.to_csv(df, 'cleaned_users.csv')
  return df

# Bonus:
# Wrap your code in a function with try-except
# Add CLI support using argparse or sys.argv
try:
  url = "https://jsonplaceholder.typicode.com/users"
  fetch_and_clean_users(url)
except Exception as e:
  print('Error :', e)



import unittest
import pandas as pd

class TestUserDataPipeline(unittest.TestCase):

    def setUp(self):
        self.api_url = "https://jsonplaceholder.typicode.com/users"
        self.df = fetch_and_clean_users(self.api_url)

    def test_columns_exist(self):
        """Ensure required columns are present"""
        expected_columns = {'name', 'email', 'city'}
        self.assertTrue(expected_columns.issubset(set(self.df.columns)))

    def test_email_format(self):
        """Check email validity using regex"""
        import re
        pattern = re.compile(r"[^@]+@[^@]+\.[^@]+")
        invalid_emails = self.df[~self.df['email'].apply(lambda x: bool(pattern.match(x)))]
        self.assertTrue(invalid_emails.empty, "Found invalid email formats")

    def test_name_title_case(self):
        """Check all names are in title case"""
        self.assertTrue(all(name == name.title() for name in self.df['name']))

    def test_city_not_null(self):
        """Ensure no null cities"""
        self.assertFalse(self.df['city'].isnull().any())

    def test_dataframe_not_empty(self):
        """Ensure DataFrame has at least one row"""
        self.assertGreater(len(self.df), 0)

unittest.main(argv=[''], verbosity=2, exit=False)

import re
import pytest
import pandas as pd

@pytest.fixture(scope="module")
def cleaned_df():
    api_url = "https://jsonplaceholder.typicode.com/users"
    return fetch_and_clean_users(api_url)

def test_columns_exist(cleaned_df):
    expected_columns = {'name', 'email', 'city'}
    assert expected_columns.issubset(set(cleaned_df.columns)), "Missing required columns"

def test_email_format(cleaned_df):
    pattern = re.compile(r"[^@]+@[^@]+\.[^@]+")
    invalid = cleaned_df[~cleaned_df['email'].apply(lambda x: bool(pattern.match(x)))]
    assert invalid.empty, f"Invalid emails found: {invalid['email'].tolist()}"

def test_name_title_case(cleaned_df):
    non_title_names = [name for name in cleaned_df['name'] if name != name.title()]
    assert not non_title_names, f"Names not in title case: {non_title_names}"

def test_city_not_null(cleaned_df):
    assert not cleaned_df['city'].isnull().any(), "Null values found in 'city' column"

def test_dataframe_not_empty(cleaned_df):
    assert len(cleaned_df) > 0, "DataFrame is empty"

pytest.main()


test_city_not_null (__main__.TestUserDataPipeline.test_city_not_null)
Ensure no null cities ... ok
test_columns_exist (__main__.TestUserDataPipeline.test_columns_exist)
Ensure required columns are present ... ok
test_dataframe_not_empty (__main__.TestUserDataPipeline.test_dataframe_not_empty)
Ensure DataFrame has at least one row ... ok
test_email_format (__main__.TestUserDataPipeline.test_email_format)
Check email validity using regex ... ok
test_name_title_case (__main__.TestUserDataPipeline.test_name_title_case)
Check all names are in title case ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.319s

OK
ERROR: usage: colab_kernel_launcher.py [options] [file_or_dir] [file_or_dir] [...]
colab_kernel_launcher.py: error: unrecognized arguments: -f
  inifile: None
  rootdir: /root/.local/share/jupyter/runtime



<ExitCode.USAGE_ERROR: 4>