### Q1. different delimiters in a file in python

In [0]:
from pyspark.sql.functions import split
# Sample data with multiple string entries and mixed delimiters
data = [
    "1,Alice\t30|New York",     
    "2|Bob,25\tLos Angeles",    
    "3\tCharlie|35,San Francisco", 
    "4|David,40|Boston"          
]
# Creating a DataFrame with a single column
df = spark.createDataFrame(data, 'string')
split_cols = split(df['value'], ',|\t|\|')
df = df.withColumn('id', split_cols.getItem(0)).withColumn('name', split_cols.getItem(1)).withColumn('age', split_cols.getItem(2)).withColumn('city', split_cols.getItem(3))

df.select('id', 'name', 'age', 'city').show()

+---+-------+---+-------------+
| id|   name|age|         city|
+---+-------+---+-------------+
|  1|  Alice| 30|     New York|
|  2|    Bob| 25|  Los Angeles|
|  3|Charlie| 35|San Francisco|
|  4|  David| 40|       Boston|
+---+-------+---+-------------+



### Q2. Missing Numbers

In [0]:
# Sample data
data = [1, 2, 4, 5, 6, 7, 8, 10]
df_1 = spark.createDataFrame([(x,) for x in data], ["number"])
# Generating a complete sequence DataFrame
df_2 = spark.range(1,11).toDF("number")

missing_numbers = df_2.join(df_1, 'number', 'left_anti')
missing_numbers.show()



+------+
|number|
+------+
|     3|
|     9|
+------+




You're right! In the previous example with strings that contained mixed delimiters, we didn’t need to convert each string into a tuple to create a DataFrame. This is because PySpark can directly create a DataFrame from a list of strings, treating each string as a single row in a DataFrame. The reason is that each string already represents a single value, so PySpark treats it as a single column entry by default.

For the case with numbers, however, converting to a tuple is necessary because createDataFrame expects rows to be tuples when the list contains individual numbers instead of strings. Unlike strings, numbers do not carry an implicit structure for PySpark to infer a single column. By converting each number to a single-element tuple, we give it a structure that PySpark recognizes as a single column.
Summary
Strings: When creating a DataFrame from a list of strings, each string is treated as a single row, so no tuple conversion is needed.
Numbers: When creating a DataFrame from a list of individual numbers, we need to convert each number to a tuple, as PySpark expects rows to have some structure (like a tuple) for non-string data.

### Q3. Top 3 movies

In [0]:
# Sample DataFrames
from pyspark.sql.functions import avg, col
data_movies = [(1, "Movie A"), (2, "Movie B"), (3, "Movie C"), (4, "Movie D"), (5, "Movie E")]

data_ratings = [(1, 101, 4.5), (1, 102, 4.0), (2, 103, 5.0), 
                (2, 104, 3.5), (3, 105, 4.0), (3, 106, 4.0), 
                (4, 107, 3.0), (5, 108, 2.5), (5, 109, 3.0)]

columns_movies = ["MovieID", "MovieName"]
columns_ratings = ["MovieID", "UserID", "Rating"]

movies_df = spark.createDataFrame(data_movies, columns_movies)
ratings_df = spark.createDataFrame(data_ratings, columns_ratings)
top_movies = movies_df.join(ratings_df, 'MovieID', 'inner')
top_movies = top_movies.groupBy('MovieID', 'MovieName').agg(avg('rating').alias("avg_rating")).orderBy('avg_rating', ascending=False).limit(3)
top_movies.show()





+-------+---------+----------+
|MovieID|MovieName|avg_rating|
+-------+---------+----------+
|      1|  Movie A|      4.25|
|      2|  Movie B|      4.25|
|      3|  Movie C|       4.0|
+-------+---------+----------+



### Q4. rolling average

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, to_date
data = [('2023-01-01', 100, 10),
        ('2023-01-02', 100, 15),
        ('2023-01-02', 100, 20),
        ('2023-01-04', 100, 25),
        ('2023-01-04', 100, 30),
        ('2023-01-06', 100, 35),
        ('2023-01-07', 100, 40),
        ('2023-01-08', 100, 45),
        ('2023-01-09', 100, 50)
        ]
columns = ['date', 'productid', 'quantitysold']
df = spark.createDataFrame(data, columns)
df = df.withColumn('date', to_date(col('date'), 'yyyy-MM-dd'))
window_col = Window.partitionBy('productid').orderBy('date').rowsBetween(Window.unboundedPreceding,Window.currentRow)
df = df.withColumn('rolling_avg', avg('quantitysold').over(window_col))
df.show()

+----------+---------+------------+-----------+
|      date|productid|quantitysold|rolling_avg|
+----------+---------+------------+-----------+
|2023-01-01|      100|          10|       10.0|
|2023-01-02|      100|          15|       12.5|
|2023-01-02|      100|          20|       15.0|
|2023-01-04|      100|          25|       17.5|
|2023-01-04|      100|          30|       20.0|
|2023-01-06|      100|          35|       22.5|
|2023-01-07|      100|          40|       25.0|
|2023-01-08|      100|          45|       27.5|
|2023-01-09|      100|          50|       30.0|
+----------+---------+------------+-----------+



### Q5. udf function

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
data = [(4001, 17), (4002, 45), (4003, 65),(4004, 30), (4005, 80)]
columns = ['user_id', 'age']
df = spark.createDataFrame(data, columns)
def age_categorisation(n):
    if n<25:
        return 'young'
    elif n<60:
        return 'adult'
    else:
        return 'senior citizen'
age_udf = udf(age_categorisation, StringType())
df = df.withColumn('category', age_udf(col('age')))
df.show()

+-------+---+--------------+
|user_id|age|      category|
+-------+---+--------------+
|   4001| 17|         young|
|   4002| 45|         adult|
|   4003| 65|senior citizen|
|   4004| 30|         adult|
|   4005| 80|senior citizen|
+-------+---+--------------+



### Q6. Find the count of unique visitors to a website per day.

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import to_date, countDistinct

visitor_data = [Row(Date='2023-01-01', VisitorID=101),
                Row(Date='2023-01-01', VisitorID=102),
                Row(Date='2023-01-01', VisitorID=101),
                Row(Date='2023-01-02', VisitorID=103),
                Row(Date='2023-01-02', VisitorID=101)]

# Create DataFrame
df_visitors = spark.createDataFrame(visitor_data)
df_visitors = df_visitors.withColumn('Date', to_date(col('Date'), 'yyyy-MM-dd'))
df_visitors = df_visitors.groupBy('Date').agg(countDistinct('VisitorID').alias('unique visitors'))
df_visitors.show()

+----------+---------------+
|      Date|unique visitors|
+----------+---------------+
|2023-01-01|              2|
|2023-01-02|              2|
+----------+---------------+



### Q7. Determine the first purchase date for each user

In [0]:
from pyspark.sql.functions import min
purchase_data = [
    Row(UserID=1, PurchaseDate='2023-01-05'),
    Row(UserID=1, PurchaseDate='2023-01-10'),
    Row(UserID=2, PurchaseDate='2023-01-03'),
    Row(UserID=3, PurchaseDate='2023-01-12')
]

df_purchases = spark.createDataFrame(purchase_data)
df_purchases = df_purchases.withColumn("PurchaseDate", to_date(col('PurchaseDate'), 'yyyy-MM-dd'))
df_purchases = df_purchases.groupBy('UserId').agg(min('PurchaseDate').alias('FirstPurchaseDate'))
df_purchases.show()

+------+-----------------+
|UserId|FirstPurchaseDate|
+------+-----------------+
|     1|       2023-01-05|
|     2|       2023-01-03|
|     3|       2023-01-12|
+------+-----------------+



### Q8. Generate a sequential number for each row within each group, ordered by date.

In [0]:
group_data = [
    Row(GroupID='A', Date='2023-01-01'),
    Row(GroupID='A', Date='2023-01-02'),
    Row(GroupID='B', Date='2023-01-01'),
    Row(GroupID='B', Date='2023-01-03')
]
df_group = spark.createDataFrame(group_data)
df_group = df_group.withColumn('Date', col('Date').cast('date'))
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window_spec = Window.partitionBy('GroupId').orderBy('Date')
df_group = df_group.withColumn('rn', row_number().over(window_spec))
df_group.show()

+-------+----------+---+
|GroupID|      Date| rn|
+-------+----------+---+
|      A|2023-01-01|  1|
|      A|2023-01-02|  2|
|      B|2023-01-01|  1|
|      B|2023-01-03|  2|
+-------+----------+---+



#### Q9. Problem: Given a dataset of sales records, identify and replace all missing values in the 'amount' column with the average sales amount.

In [0]:
sales_data = [("1", 100), ("2", 150), ("3", None), ("4", 200), ("5", None)]
sales_df = spark.createDataFrame(sales_data, ["sale_id", "amount"])
from pyspark.sql.functions import mean
avg_amount = sales_df.na.drop().agg(mean(col('amount'))).first()[0]
sales_df_filled = sales_df.na.fill(avg_amount)
sales_df_filled.show()

+-------+------+
|sale_id|amount|
+-------+------+
|      1|   100|
|      2|   150|
|      3|   150|
|      4|   200|
|      5|   150|
+-------+------+



#### Q10. unpivoting 
Problem: Given a dataset of sales records with monthly sales per product, reshape the data to have one row per product-month combination.

In [0]:
data = [("Product1", 100, 150, 200),
        ("Product2", 200, 250, 300),
        ("Product3", 300, 350, 400)]
columns = ["Product", "Sales_Jan", "Sales_Feb", "Sales_Mar"]
df = spark.createDataFrame(data, columns)
from pyspark.sql.functions import expr
df = df.select('product', expr("stack(3, 'Jan', Sales_Jan, 'Feb', Sales_Feb, 'Mar', Sales_Mar) as (Month, Amount)"))
df.show()


+--------+-----+------+
| product|Month|Amount|
+--------+-----+------+
|Product1|  Jan|   100|
|Product1|  Feb|   150|
|Product1|  Mar|   200|
|Product2|  Jan|   200|
|Product2|  Feb|   250|
|Product2|  Mar|   300|
|Product3|  Jan|   300|
|Product3|  Feb|   350|
|Product3|  Mar|   400|
+--------+-----+------+



### Q11 . 
you are given a dataframe of students names students_df. write a function named grade_colors to select only rows 
where student favourute color is either green or red and grade above 90
student df == name, age, favourite_clolor, grade 

In [0]:
data = [("a", 19, 'red', 91), ("b", 20, 'yellow', 95), ("c", 21, 'green', 82), ("d", 20, 'blue', 75), ("e", 23, 'green', 93)]
columns = [ "name", "age", "favourite_clolor", "grade" ]
students_df = spark.createDataFrame(data, columns)
def grade_colr(df):
    return df.filter((col("favourite_clolor").isin('green', 'red')) & (col('grade')>90))
result_df = grade_colr(students_df)
result_df.show()

+----+---+----------------+-----+
|name|age|favourite_clolor|grade|
+----+---+----------------+-----+
|   a| 19|             red|   91|
|   e| 23|           green|   93|
+----+---+----------------+-----+



### Q.12 Explode with split

In [0]:
from pyspark.sql.functions import explode
columns = ['empId', 'name', 'Locations']
data = [('1','Gaurav', 'Pune,Bangalore,Hyderabad'), ('1','Rishab', 'Pune,Bangalore,Mumbai')]
employee_df = spark.createDataFrame(data, columns)
split_df = employee_df.withColumn('location', explode(split(col('Locations'),',')))
final_df = split_df.select('empId','name',"location").filter(col('location')!='Bangalore')
final_df.show()

+-----+------+---------+
|empId|  name| location|
+-----+------+---------+
|    1|Gaurav|     Pune|
|    1|Gaurav|Hyderabad|
|    1|Rishab|     Pune|
|    1|Rishab|   Mumbai|
+-----+------+---------+



### Q.13 count the number of movies in each genre

In [0]:
data =[('The Shawshank Redemption',['Drama', 'Crime']),
                  ('The Godfather', ['Drama', 'Crime']),
                  ('Pulp Fiction', ['Drama', 'Crime','Thriller']),
                  ('The Dark Knight', ['Drama', 'Crime','Thriller','Action'])
                  ]

columns = ["name", "genres"]
movies = spark.createDataFrame(data, columns)
df = movies.select('name', explode((col('genres'))).alias('genre'))
df = df.groupBy('genre').count()
df.show()
                       

+--------+-----+
|   genre|count|
+--------+-----+
|   Crime|    4|
|   Drama|    4|
|Thriller|    2|
|  Action|    1|
+--------+-----+



### Q14. 
write a pyspark query to repeat a number one times it self
eg : input : 1,2,3,5
output : 1,2,2,3,3,3,5,5,5,5,5

In [0]:

from pyspark.sql.functions import col, expr
data = [(1,), (2,), (3,), (5,)]
columns = ["number"]
df = spark.createDataFrame(data, columns)
max_number = df.selectExpr("max(number) as max").first()["max"]
sequence_df = spark.range(1, max_number + 1).selectExpr("id as seq")
df_repeated = df.crossJoin(sequence_df).filter(col("seq") <= col("number"))
df_result = df_repeated.select(col("number").alias("repeated_number"))
df_result.show()


+---------------+
|repeated_number|
+---------------+
|              1|
|              2|
|              2|
|              3|
|              3|
|              3|
|              5|
|              5|
|              5|
|              5|
|              5|
+---------------+



In [0]:


from pyspark.sql.functions import expr, col
data = [(1,), (2,), (3,), (5,)]
columns = ["number"]
df = spark.createDataFrame(data, columns)
df_casted = df.withColumn("number", col("number").cast("int"))
df_repeated = df_casted.selectExpr("explode(array_repeat(number, number)) as number")
df_repeated.show()



+------+
|number|
+------+
|     1|
|     2|
|     2|
|     3|
|     3|
|     3|
|     5|
|     5|
|     5|
|     5|
|     5|
+------+



In [0]:
# finding duplicates in dataframe

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("emp_name", StringType(), True),
    StructField("emp_gender", StringType(), True),
    StructField("emp_age", IntegerType(), True),
    StructField("emp_salary", IntegerType(), True),
    StructField("emp_manager", StringType(), True)
])

data = [
    (1, "Arjun Patel", "Male", 30, 60000, "Aarav Sharma"),
    (2, "Aarav Sharma", "Male", 28, 55000, "Zara Singh"),
    (3, "Zara Singh", "Female", 35, 70000, "Arjun Patel"),
    (4, "Priya Reddy", "Female", 32, 65000, "Aarav Sharma"),
    (1, "Arjun Patel", "Male", 30, 60000, "Aarav Sharma"),
    (6, "Naina Verma", "Female", 31, 72000, "Arjun Patel"),
    (1, "Arjun Patel", "Male", 30, 60000, "Aarav Sharma"),
    (4, "Priya Reddy", "Female", 32, 65000, "Aarav Sharma"),
    (5, "Aditya Kapoor", "Male", 28, 58000, "Zara Singh"),
    (10, "Anaya Joshi", "Female", 27, 59000, "Aarav Sharma"),
    (11, "Rohan Malhotra", "Male", 36, 73000, "Zara Singh"),
    (3, "Zara Singh", "Female", 35, 70000, "Arjun Patel")
]
# using groupby
df = spark.createDataFrame(data, schema=schema)
df = df.groupBy(df.columns).count()
df = df.filter(col('count')>1).drop(col('count'))
df.show()

# using window function

df_2 = spark.createDataFrame(data, schema=schema)
window_spec = Window.partitionBy(df_2.columns).orderBy(df_2.columns)
df_2 = df_2.withColumn('rn', row_number().over(window_spec))
df_2 = df_2.filter(col('rn')>1).drop(col('rn'))
df_2.show()


+------+-----------+----------+-------+----------+------------+
|emp_id|   emp_name|emp_gender|emp_age|emp_salary| emp_manager|
+------+-----------+----------+-------+----------+------------+
|     1|Arjun Patel|      Male|     30|     60000|Aarav Sharma|
|     3| Zara Singh|    Female|     35|     70000| Arjun Patel|
|     4|Priya Reddy|    Female|     32|     65000|Aarav Sharma|
+------+-----------+----------+-------+----------+------------+

+------+-----------+----------+-------+----------+------------+
|emp_id|   emp_name|emp_gender|emp_age|emp_salary| emp_manager|
+------+-----------+----------+-------+----------+------------+
|     1|Arjun Patel|      Male|     30|     60000|Aarav Sharma|
|     1|Arjun Patel|      Male|     30|     60000|Aarav Sharma|
|     3| Zara Singh|    Female|     35|     70000| Arjun Patel|
|     4|Priya Reddy|    Female|     32|     65000|Aarav Sharma|
+------+-----------+----------+-------+----------+------------+



In [0]:
# dropping duplicates in 

# using drop duplicates
df_3 = spark.createDataFrame(data, schema=schema)
df_3 = df_3.dropDuplicates()
df_3.show()

# using groupby
df_4 = spark.createDataFrame(data, schema=schema)
df_4 = df_4.groupBy(df_4.columns).count()
df_4 = df_4.drop(col('count'))
df_4.show()

+------+--------------+----------+-------+----------+------------+
|emp_id|      emp_name|emp_gender|emp_age|emp_salary| emp_manager|
+------+--------------+----------+-------+----------+------------+
|     1|   Arjun Patel|      Male|     30|     60000|Aarav Sharma|
|     3|    Zara Singh|    Female|     35|     70000| Arjun Patel|
|     2|  Aarav Sharma|      Male|     28|     55000|  Zara Singh|
|     4|   Priya Reddy|    Female|     32|     65000|Aarav Sharma|
|     6|   Naina Verma|    Female|     31|     72000| Arjun Patel|
|     5| Aditya Kapoor|      Male|     28|     58000|  Zara Singh|
|    10|   Anaya Joshi|    Female|     27|     59000|Aarav Sharma|
|    11|Rohan Malhotra|      Male|     36|     73000|  Zara Singh|
+------+--------------+----------+-------+----------+------------+

+------+--------------+----------+-------+----------+------------+
|emp_id|      emp_name|emp_gender|emp_age|emp_salary| emp_manager|
+------+--------------+----------+-------+----------+--------