1. How to import PySpark and check the version?

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySparkex").getOrCreate()
print(spark.version)

24/09/24 11:45:16 WARN Utils: Your hostname, AI-CJB-LAP-460 resolves to a loopback address: 127.0.1.1; using 192.168.1.165 instead (on interface wlp0s20f3)
24/09/24 11:45:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/24 11:45:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3.5.2


2. How to convert the index of a PySpark DataFrame into a column?

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import row_number
spark = SparkSession.builder \
    .appName("Convert Index to Column") \
    .getOrCreate()
df = spark.createDataFrame([
    ("Alice", 1),
    ("Bob", 2),
    ("Charlie", 3),
], ["Name", "Value"])
win= Window.orderBy("Name") 
df_with_index = df.withColumn("Index", row_number().over(win))
df_with_index.show()


24/09/24 11:45:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/09/24 11:45:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+-------+-----+-----+
|   Name|Value|Index|
+-------+-----+-----+
|  Alice|    1|    1|
|    Bob|    2|    2|
|Charlie|    3|    3|
+-------+-----+-----+



3. How to combine many lists to form a PySpark DataFrame?

In [3]:
list1 = ["a", "b", "c", "d"]
list2 = [1, 2, 3, 4]
rdd=spark.sparkContext.parallelize(list(zip(list1,list2)))
df=rdd.toDF(["col1","col2"])
df.show()


+----+----+
|col1|col2|
+----+----+
|   a|   1|
|   b|   2|
|   c|   3|
|   d|   4|
+----+----+



4. How to get the items of list A not present in list B?

In [4]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
rdd1=spark.sparkContext.parallelize(list_A)
rdd2=spark.sparkContext.parallelize(list_B)
res=rdd1.subtract(rdd2)
res.collect()


                                                                                

[1, 2, 3]

5. How to get the items not common to both list A and list B?

In [5]:
list_A = [1, 2, 3, 4, 5]
list_B = [4, 5, 6, 7, 8]
rdd1=spark.sparkContext.parallelize(list_A)
rdd2=spark.sparkContext.parallelize(list_B)
res1=rdd1.subtract(rdd2)
res2=rdd2.subtract(rdd1)
res=res1.union(res2)
res.collect()


                                                                                

[1, 2, 3, 6, 7, 8]

6. How to get the minimum, 25th percentile, median, 75th, and max of a numeric column?

In [6]:
data = [("A", 10), ("B", 20), ("C", 30), ("D", 40), ("E", 50), ("F", 15), ("G", 28), ("H", 54), ("I", 41), ("J", 86)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

quantiles = df.approxQuantile("Age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.01)

print("Min: ", quantiles[0])
print("25th percentile: ", quantiles[1])
print("Median: ", quantiles[2])
print("75th percentile: ", quantiles[3])
print("Max: ", quantiles[4])

+----+---+
|Name|Age|
+----+---+
|   A| 10|
|   B| 20|
|   C| 30|
|   D| 40|
|   E| 50|
|   F| 15|
|   G| 28|
|   H| 54|
|   I| 41|
|   J| 86|
+----+---+

Min:  10.0
25th percentile:  20.0
Median:  30.0
75th percentile:  50.0
Max:  86.0


7. How to get frequency counts of unique items of a column?

In [7]:
from pyspark.sql import Row

data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]
df = spark.createDataFrame(data)
df.groupBy("name").count().show()
df.groupBy("job").count().show()

+----+-----+
|name|count|
+----+-----+
|John|    2|
|Mary|    1|
| Bob|    3|
| Sam|    1|
+----+-----+

+---------+-----+
|      job|count|
+---------+-----+
| Engineer|    4|
|Scientist|    2|
|   Doctor|    1|
+---------+-----+



8. How to keep only top 2 most frequent values as it is and replace everything else as ‘Other’?

In [8]:
from pyspark.sql import Row
from pyspark.sql.functions import col, when
data = [
Row(name='John', job='Engineer'),
Row(name='John', job='Engineer'),
Row(name='Mary', job='Scientist'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Engineer'),
Row(name='Bob', job='Scientist'),
Row(name='Sam', job='Doctor'),
]
df = spark.createDataFrame(data)
top_2 = df.groupBy('job').count()\
    .orderBy('count', ascending=False)\
        .limit(2).select('job').rdd.flatMap(lambda x: x).collect()
df = df.withColumn('job', when(col('job')\
.isin(top_2), col('job')).otherwise('Other'))
df.show()

+----+---------+
|name|      job|
+----+---------+
|John| Engineer|
|John| Engineer|
|Mary|Scientist|
| Bob| Engineer|
| Bob| Engineer|
| Bob|Scientist|
| Sam|    Other|
+----+---------+



9. How to Drop rows with NA values specific to a particular column?

In [9]:
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])
df1=df.dropna(subset=["Value"])
df1.show()

+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B|    3| 456|
+----+-----+----+



10. How to rename columns of a PySpark DataFrame using two lists – one containing the old column names and the other containing the new column names?

In [10]:
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ["col1", "col2", "col3"])
old_names = ["col1", "col2", "col3"]
new_names = ["new_col1", "new_col2", "new_col3"]
df1 = df.toDF(*new_names)
df1.show()

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|       1|       2|       3|
|       4|       5|       6|
+--------+--------+--------+



11. How to bin a numeric list to 10 groups of equal size?

In [11]:
from pyspark.sql.functions import rand
from pyspark.sql import functions as F
num_items = 100
df = spark.range(num_items).select\
    (rand(seed=42).alias("values"))
min_value = df.agg(F.min("values")).first()[0]
max_value = df.agg(F.max("values")).first()[0]
bin_size = (max_value - min_value) / 10
binned_df = df.withColumn(
    "bucket",
    (F.col("values") - min_value) / bin_size
)
binned_df = binned_df.withColumn("bucket", F.ceil(F.col("bucket")))
print("Binned DataFrame:")
binned_df.show(5)


Binned DataFrame:
+-------------------+------+
|             values|bucket|
+-------------------+------+
|  0.619189370225301|     7|
| 0.5096018842446481|     6|
| 0.8325259388871524|     9|
|0.26322809041172357|     3|
| 0.6702867696264135|     7|
+-------------------+------+
only showing top 5 rows



12. How to create contigency table?

In [12]:
data = [("A", "X"), ("A", "Y"), ("A", "X"), ("B", "Y"), ("B", "X"), ("C", "X"), ("C", "X"), ("C", "Y")]
df = spark.createDataFrame(data, ["category1", "category2"])
df.crosstab("category1", "category2").show()



+-------------------+---+---+
|category1_category2|  X|  Y|
+-------------------+---+---+
|                  B|  1|  1|
|                  C|  2|  1|
|                  A|  2|  1|
+-------------------+---+---+



13. How to find the numbers that are multiples of 3 from a column?

In [13]:
from pyspark.sql.functions import rand
df = spark.range(10)
df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))
df1=df.withColumn("is_multiple_of_3", when(col("random") % 3 == 0,"yes").otherwise('no'))
df1.show()

+---+------+----------------+
| id|random|is_multiple_of_3|
+---+------+----------------+
|  0|     7|              no|
|  1|     9|             yes|
|  2|     8|              no|
|  3|     8|              no|
|  4|     3|             yes|
|  5|     1|              no|
|  6|     7|              no|
|  7|     4|              no|
|  8|     5|              no|
|  9|     1|              no|
+---+------+----------------+



14. How to extract items at given positions from a column?

In [14]:
from pyspark.sql.functions import rand

df = df.withColumn("random", ((rand(seed=42) * 10) + 1).cast("int"))

pos = [0, 4, 8, 5]
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

pos = [0, 4, 8, 5]

# Define window specification
w = Window.orderBy(monotonically_increasing_id())

# Add index
df = df.withColumn("index", row_number().over(w) - 1)

df.show()

# Filter the DataFrame based on the specified positions
df_filtered = df.filter(df.index.isin(pos))

df_filtered.show()

24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+------+-----+
| id|random|index|
+---+------+-----+
|  0|     7|    0|
|  1|     9|    1|
|  2|     8|    2|
|  3|     8|    3|
|  4|     3|    4|
|  5|     1|    5|
|  6|     7|    6|
|  7|     4|    7|
|  8|     5|    8|
|  9|     1|    9|
+---+------+-----+

+---+------+-----+
| id|random|index|
+---+------+-----+
|  0|     7|    0|
|  4|     3|    4|
|  5|     1|    5|
|  8|     5|    8|
+---+------+-----+



24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


15. How to stack two DataFrames vertically ?

In [15]:
df_A = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 10), ("orange", 2, 8)], ["Name", "Col_1", "Col_2"])
df_B = spark.createDataFrame([("apple", 3, 5), ("banana", 1, 15), ("grape", 4, 6)], ["Name", "Col_1", "Col_3"])
df_A.union(df_B).show()

+------+-----+-----+
|  Name|Col_1|Col_2|
+------+-----+-----+
| apple|    3|    5|
|banana|    1|   10|
|orange|    2|    8|
| apple|    3|    5|
|banana|    1|   15|
| grape|    4|    6|
+------+-----+-----+



16. How to compute the mean squared error on a truth and predicted columns?

In [16]:
from pyspark.sql import functions as F
data = [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
df = spark.createDataFrame(data, ["actual", "predicted"])
df.select(F.mean((F.col("actual") - F.col("predicted")) ** 2).alias("MSE")).show()

+-----+
|  MSE|
+-----+
|116.8|
+-----+



17. How to convert the first character of each element in a series to uppercase?

In [17]:
from pyspark.sql.functions import initcap
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])

df = df.withColumn("name", initcap(df.name))
df.show()

+-----+
| name|
+-----+
| John|
|Alice|
|  Bob|
+-----+



18. How to compute summary statistics for all columns in a dataframe

In [18]:
data = [('James', 34, 55000),
('Michael', 30, 70000),
('Robert', 37, 60000),
('Maria', 29, 80000),
('Jen', 32, 65000)]

df = spark.createDataFrame(data, ["name", "age" , "salary"])
df.summary().show()


24/09/24 11:45:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------+-----------------+-----------------+
|summary|  name|              age|           salary|
+-------+------+-----------------+-----------------+
|  count|     5|                5|                5|
|   mean|  NULL|             32.4|          66000.0|
| stddev|  NULL|3.209361307176242|9617.692030835675|
|    min| James|               29|            55000|
|    25%|  NULL|               30|            60000|
|    50%|  NULL|               32|            65000|
|    75%|  NULL|               34|            70000|
|    max|Robert|               37|            80000|
+-------+------+-----------------+-----------------+




19. How to calculate the number of characters in each word in a column?

In [19]:
from pyspark.sql import functions as F
data = [("john",), ("alice",), ("bob",)]
df = spark.createDataFrame(data, ["name"])
df = df.withColumn('len', F.length(df.name))
df.show()

+-----+---+
| name|len|
+-----+---+
| john|  4|
|alice|  5|
|  bob|  3|
+-----+---+



In [20]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
    .appName("Difference of Differences") \
    .getOrCreate()
data = [('James', 34, 55000),
        ('Michael', 30, 70000),
        ('Robert', 37, 60000),
        ('Maria', 29, 80000),
        ('Jen', 32, 65000)]
df = spark.createDataFrame(data, ["Name", "Age", "Salary"])
window_spec = Window.orderBy("Salary")
df_first_diff = df.withColumn("first_diff", F.col("Salary") - F.lag("Salary").over(window_spec))
df_second_diff = df_first_diff.withColumn("second_diff", 
    F.col("first_diff") - F.lag("first_diff").over(window_spec))
df_second_diff.show()

24/09/24 11:45:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+---+------+----------+-----------+
|   Name|Age|Salary|first_diff|second_diff|
+-------+---+------+----------+-----------+
|  James| 34| 55000|      NULL|       NULL|
| Robert| 37| 60000|      5000|       NULL|
|    Jen| 32| 65000|      5000|          0|
|Michael| 30| 70000|      5000|          0|
|  Maria| 29| 80000|     10000|       5000|
+-------+---+------+----------+-----------+



24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


21. How to get the day of month, week number, day of year and day of week from a date strings?

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import to_date, dayofmonth, weekofyear, dayofyear, dayofweek
spark = SparkSession.builder \
    .appName("dateex") \
    .getOrCreate()
data = [("2023-05-18", "01 Jan 2010"), ("2023-12-31", "01 Jan 2010")]
df = spark.createDataFrame(data, ["date_str_1", "date_str_2"])

df = df.withColumn("date_1", to_date(df.date_str_1, 'yyyy-MM-dd'))
df = df.withColumn("date_2", to_date(df.date_str_2, 'dd MMM yyyy'))

df = df.withColumn("day_of_month", dayofmonth(df.date_1))\
.withColumn("week_number", weekofyear(df.date_1))\
.withColumn("day_of_year", dayofyear(df.date_1))\
.withColumn("day_of_week", dayofweek(df.date_1))

df.show()


24/09/24 11:45:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|date_str_1| date_str_2|    date_1|    date_2|day_of_month|week_number|day_of_year|day_of_week|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+
|2023-05-18|01 Jan 2010|2023-05-18|2010-01-01|          18|         20|        138|          5|
|2023-12-31|01 Jan 2010|2023-12-31|2010-01-01|          31|         52|        365|          1|
+----------+-----------+----------+----------+------------+-----------+-----------+-----------+



22. How to convert year-month string to dates corresponding to the 4th day of the month?

In [22]:
from pyspark.sql.functions import expr, col
df = spark.createDataFrame([('Jan 2010',), ('Feb 2011',), ('Mar 2012',)], ['MonthYear'])
df = df.withColumn('Date', expr("to_date(MonthYear, 'MMM yyyy')"))

df = df.withColumn('Date', expr("date_add(date_sub(Date, day(Date) - 1), 3)"))

df.show()


+---------+----------+
|MonthYear|      Date|
+---------+----------+
| Jan 2010|2010-01-04|
| Feb 2011|2011-02-04|
| Mar 2012|2012-03-04|
+---------+----------+



23 How to filter words that contain atleast 2 vowels from a series?

In [23]:
df = spark.createDataFrame([('Apple',), ('Orange',), ('Plan',) , ('Python',) , ('Money',)], ['Word'])
from pyspark.sql.functions import col, length, translate

df2 = df.filter(F.col("word").rlike(r'[aeiouAEIOU].*[aeiouAEIOU]'))
df2.show()

+------+
|  Word|
+------+
| Apple|
|Orange|
| Money|
+------+



24. How to filter valid emails from a list?

In [24]:
data = ['buying books at amazom.com', 'rameses@egypt.com', 'matt@t.co', 'narendra@modi.com']
df = spark.createDataFrame(data, "string")
pattern = '[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,4}'
df2=df.filter(F.col("value").rlike(pattern))
df2.show(truncate =False)

+-----------------+
|value            |
+-----------------+
|rameses@egypt.com|
|matt@t.co        |
|narendra@modi.com|
+-----------------+



25. How to Pivot PySpark DataFrame?


In [25]:
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
pivot_df=df.groupBy("year","region").pivot("quarter").agg(F.mean("revenue"))
pivot_df.show()


+----+------+------+------+------+------+
|year|region|     1|     2|     3|     4|
+----+------+------+------+------+------+
|2021|    US|5000.0|5500.0|6000.0|7000.0|
|2021|    EU|4000.0|4500.0|5000.0|6000.0|
+----+------+------+------+------+------+



26. How to get the mean of a variable grouped by another variable?

In [26]:
data = [("1001", "Laptop", 1000),
("1002", "Mouse", 50),
("1003", "Laptop", 1200),
("1004", "Mouse", 30),
("1005", "Smartphone", 700)]
columns = ["OrderID", "Product", "Price"]
df = spark.createDataFrame(data, columns)
df2=df.groupBy("Product").agg(F.mean("Price"))
df2.show()

+----------+----------+
|   Product|avg(Price)|
+----------+----------+
|    Laptop|    1100.0|
|     Mouse|      40.0|
|Smartphone|     700.0|
+----------+----------+



27. How to compute the euclidean distance between two columns?

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
    .appName("Euclideanex") \
    .getOrCreate()
data = [(1, 10), (2, 9), (3, 8), (4, 7), (5, 6), (6, 5), (7, 4), (8, 3), (9, 2), (10, 1)]
df = spark.createDataFrame(data, ["series1", "series2"])
df = df.withColumn("euclidean_distance", F.sqrt((F.col("series1") - F.col("series2"))**2))
df.show()

24/09/24 11:45:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------+-------+------------------+
|series1|series2|euclidean_distance|
+-------+-------+------------------+
|      1|     10|               9.0|
|      2|      9|               7.0|
|      3|      8|               5.0|
|      4|      7|               3.0|
|      5|      6|               1.0|
|      6|      5|               1.0|
|      7|      4|               3.0|
|      8|      3|               5.0|
|      9|      2|               7.0|
|     10|      1|               9.0|
+-------+-------+------------------+



28. How to replace missing spaces in a string with the least frequent character?

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from collections import Counter

df = spark.createDataFrame([('dbc deb abed gade',)], ["string"])
def least_frequent_char(s):
    s = s.replace(" ", "")  
    if not s:  
        return None
    return min(Counter(s), key=Counter(s).get)
least_f = F.udf(least_frequent_char)
least_char = df.select(least_f(F.col("string")).alias("least_char")).first()[0]
if least_char:
    df = df.withColumn("replaced_string", F.regexp_replace(F.col("string"), " ", least_char))

df.show(truncate=False)

+-----------------+-----------------+
|string           |replaced_string  |
+-----------------+-----------------+
|dbc deb abed gade|dbccdebcabedcgade|
+-----------------+-----------------+



29. How to create a TimeSeries starting ‘2000-01-01’ and 10 weekends (saturdays) after that having random numbers as values?

In [29]:
from pyspark.sql.functions import expr, explode, sequence, rand

# Start date and end date (start + 10 weekends)
start_date = '2000-01-01'
end_date = '2000-03-04' 
df = spark.range(1).select(
explode(
sequence(
expr(f"date '{start_date}'"),
expr(f"date '{end_date}'"),
expr("interval 1 day")
)
).alias("date")
)
df = df.filter(expr("dayofweek(date) = 7")) 
df = df.withColumn("random_numbers", ((rand(seed=42) * 10) + 1).cast("int"))

df.show()

+----------+--------------+
|      date|random_numbers|
+----------+--------------+
|2000-01-01|             5|
|2000-01-08|             1|
|2000-01-15|             9|
|2000-01-22|             6|
|2000-01-29|             3|
|2000-02-05|             2|
|2000-02-12|             3|
|2000-02-19|             2|
|2000-02-26|             6|
|2000-03-04|             4|
+----------+--------------+



30. How to get the nrows, ncolumns, datatype of a dataframe?


In [30]:
from pyspark import SparkFiles

# Load the dataset
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"), header=True, inferSchema=True)

# Get the number of rows and columns
nrows = df.count()
ncolumns = len(df.columns)

# Get the data types
data_types = df.dtypes

# Get summary statistics
summary_stats = df.describe().show()

# Convert to NumPy array and list
numpy_array = df.toPandas().values
list_equiv = df.collect()

# Output the results
print(f"Number of rows: {nrows}")
print(f"Number of columns: {ncolumns}")




+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|summary|         RowNumber|       CustomerId|Surname|      CreditScore|Geography|Gender|               Age|            Tenure|          Balance|     NumOfProducts|          HasCrCard|     IsActiveMember|  EstimatedSalary|             Exited|
+-------+------------------+-----------------+-------+-----------------+---------+------+------------------+------------------+-----------------+------------------+-------------------+-------------------+-----------------+-------------------+
|  count|             10000|            10000|  10000|            10000|    10000| 10000|             10000|             10000|            10000|             10000|              10000|              10000|            10000|              10000|
|   mean|            5000.5|

31. How to rename a specific columns in a dataframe?

In [31]:
df = spark.createDataFrame([('Alice', 1, 30),('Bob', 2, 35)], ["name", "age", "qty"])

old_names = ["qty", "age"]
new_names = ["user_qty", "user_age"]
for old_name, new_name in zip(old_names, new_names):
    df = df.withColumnRenamed(old_name, new_name)

df.show()


+-----+--------+--------+
| name|user_age|user_qty|
+-----+--------+--------+
|Alice|       1|      30|
|  Bob|       2|      35|
+-----+--------+--------+



32. How to check if a dataframe has any missing values and count of missing values in each column?

In [32]:
from pyspark.sql.functions import col, sum
df = spark.createDataFrame([
("A", 1, None),
("B", None, "123" ),
("B", 3, "456"),
("D", None, None),
], ["Name", "Value", "id"])
missing = df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
has_missing = any(row.asDict().values() for row in missing.collect())

missing_count = missing.collect()[0].asDict()
print(missing_count)

df.show()

{'Name': 0, 'Value': 2, 'id': 2}
+----+-----+----+
|Name|Value|  id|
+----+-----+----+
|   A|    1|NULL|
|   B| NULL| 123|
|   B|    3| 456|
|   D| NULL|NULL|
+----+-----+----+



33 How to replace missing values of multiple numeric columns with the mean?

In [33]:
from pyspark.sql import functions as F

df = spark.createDataFrame([
    ("A", 1, None),
    ("B", None, 123),
    ("B", 3, 456),
    ("D", 6, None),
], ["Name", "var1", "var2"])

mean_values = df.select([F.mean(column).alias(column) for column in ["var1", "var2"]]).collect()[0]

df2 = df.na.fill({
    "var1": mean_values["var1"],
    "var2": mean_values["var2"]
})

df2.show()


+----+----+----+
|Name|var1|var2|
+----+----+----+
|   A|   1| 289|
|   B|   3| 123|
|   B|   3| 456|
|   D|   6| 289|
+----+----+----+



34. How to change the order of columns of a dataframe?

In [34]:

data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Alice", "Smith", 22)]
df = spark.createDataFrame(data, ["First_Name", "Last_Name", "Age"])
new_order = ["Age", "First_Name", "Last_Name"]
df = df.select(*new_order)
df.show()

+---+----------+---------+
|Age|First_Name|Last_Name|
+---+----------+---------+
| 30|      John|      Doe|
| 25|      Jane|      Doe|
| 22|     Alice|    Smith|
+---+----------+---------+



35. How to format or suppress scientific notations in a PySpark DataFrame?

In [35]:
df = spark.createDataFrame([(1, 0.000000123), (2, 0.000023456), (3, 0.000345678)], ["id", "your_column"])
from pyspark.sql.functions import format_number

# Determine the number of decimal places you want
decimal_places = 10

df = df.withColumn("your_column", format_number("your_column", decimal_places))
df.show()


+---+------------+
| id| your_column|
+---+------------+
|  1|0.0000001230|
|  2|0.0000234560|
|  3|0.0003456780|
+---+------------+



36. How to format all the values in a dataframe as percentages?

In [36]:
from pyspark.sql.functions import concat, col, lit
data = [(0.1, .08), (0.2, .06), (0.33, .02)]
df = spark.createDataFrame(data, ["numbers_1", "numbers_2"])
columns = ["numbers_1", "numbers_2"]

for col_name in columns:
 df = df.withColumn(col_name, concat((col(col_name) * 100).cast('decimal(10, 2)'), lit("%")))
 df.show()

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|   10.00%|     0.08|
|   20.00%|     0.06|
|   33.00%|     0.02|
+---------+---------+

+---------+---------+
|numbers_1|numbers_2|
+---------+---------+
|   10.00%|    8.00%|
|   20.00%|    6.00%|
|   33.00%|    2.00%|
+---------+---------+



37. How to filter every nth row in a dataframe?

In [37]:
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Dave", 4), ("Eve", 5),
("Frank", 6), ("Grace", 7), ("Hannah", 8), ("Igor", 9), ("Jack", 10)]

df = spark.createDataFrame(data, ["Name", "Number"])
window = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("nth", row_number().over(window))
n = 3
df = df.filter((df.nth % n) == 0)

df.show()

24/09/24 11:45:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+------+---+
|   Name|Number|nth|
+-------+------+---+
|Charlie|     3|  3|
|  Frank|     6|  6|
|   Igor|     9|  9|
+-------+------+---+



38 How to get the row number of the nth largest value in a column?

In [38]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number
data = [
Row(id=1, column1=5),
Row(id=2, column1=8),
Row(id=3, column1=12),
Row(id=4, column1=1),
Row(id=5, column1=15),
Row(id=6, column1=7),
]

df = spark.createDataFrame(data)
window = Window.orderBy(desc("column1"))
df = df.withColumn("row_number", row_number().over(window))
n = 5 
row = df.filter(df.row_number == n).first()
if row:
    print("Row number:", row.row_number)
    print("Column value:", row.column1)


24/09/24 11:45:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Row number: 5
Column value: 5


39. How to get the last n rows of a dataframe with row sum > 100?

In [39]:
data = [(10, 25, 70),
(40, 5, 20),
(70, 80, 100),
(10, 2, 60),
(40, 50, 20)]
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
sumdf = df.withColumn("sum", F.col("col1") + F.col("col2")+F.col("col3"))
fdf = sumdf.filter(F.col("sum") > 100)
n = 3
res = fdf.orderBy(F.desc("col1")).limit(n)

res.show()

+----+----+----+---+
|col1|col2|col3|sum|
+----+----+----+---+
|  70|  80| 100|250|
|  40|  50|  20|110|
|  10|  25|  70|105|
+----+----+----+---+




40. How to create a column containing the minimum by maximum of each row?

In [40]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType
data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
def minmax(row):
    return float(min(row)) / max(row)

minmaxudf = udf(minmax, FloatType())
df = df.withColumn('min_by_max', minmaxudf(array(df.columns)))

df.show()

+----+----+----+----------+
|col1|col2|col3|min_by_max|
+----+----+----+----------+
|   1|   2|   3|0.33333334|
|   4|   5|   6| 0.6666667|
|   7|   8|   9| 0.7777778|
|  10|  11|  12| 0.8333333|
+----+----+----+----------+



41. How to create a column that contains the penultimate value in each row?

In [41]:
from pyspark.sql.functions import array, col, expr
data = [(10, 20, 30),
(40, 60, 50),
(80, 70, 90)]

df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])

df_pu = df.withColumn(
    'Penultimate',
    expr("element_at(array(Column1, Column2, Column3), -2)")
)

df_pu.show()


+-------+-------+-------+-----------+
|Column1|Column2|Column3|Penultimate|
+-------+-------+-------+-----------+
|     10|     20|     30|         20|
|     40|     60|     50|         60|
|     80|     70|     90|         70|
+-------+-------+-------+-----------+



42. How to normalize all columns in a dataframe?


In [42]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max

spark = SparkSession.builder.appName("Normalization Df").getOrCreate()

data = [(1, 2, 3),
        (2, 3, 4),
        (3, 4, 5),
        (4, 5, 6)]

df = spark.createDataFrame(data, ["Col1", "Col2", "Col3"])
df.show()

for col_name in df.columns:
    min_value = df.select(min(col(col_name))).collect()[0][0]
    max_value = df.select(max(col(col_name))).collect()[0][0]
    df = df.withColumn(col_name, (col(col_name) - min_value) / (max_value - min_value))

df.show()



24/09/24 11:45:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
|   1|   2|   3|
|   2|   3|   4|
|   3|   4|   5|
|   4|   5|   6|
+----+----+----+

+------------------+------------------+------------------+
|              Col1|              Col2|              Col3|
+------------------+------------------+------------------+
|               0.0|               0.0|               0.0|
|0.3333333333333333|0.3333333333333333|0.3333333333333333|
|0.6666666666666666|0.6666666666666666|0.6666666666666666|
|               1.0|               1.0|               1.0|
+------------------+------------------+------------------+



43. How to get the positions where values of two columns match?

In [43]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col
data = [("John", "John"), ("Lily", "Lucy"), ("Sam", "Sam"), ("Lucy", "Lily")]
df = spark.createDataFrame(data, ["Name1", "Name2"])
df = df.withColumn("Match", when(col("Name1") == col("Name2"), True).otherwise(False))

df.show()

+-----+-----+-----+
|Name1|Name2|Match|
+-----+-----+-----+
| John| John| true|
| Lily| Lucy|false|
|  Sam|  Sam| true|
| Lucy| Lily|false|
+-----+-----+-----+



44. How to create lags and leads of a column by group in a dataframe?

In [44]:
from pyspark.sql.functions import lag, lead, to_date
from pyspark.sql.window import Window

data = [("2023-01-01", "Store1", 100),
("2023-01-02", "Store1", 150),
("2023-01-03", "Store1", 200),
("2023-01-04", "Store1", 250),
("2023-01-05", "Store1", 300),
("2023-01-01", "Store2", 50),
("2023-01-02", "Store2", 60),
("2023-01-03", "Store2", 80),
("2023-01-04", "Store2", 90),
("2023-01-05", "Store2", 120)]

df = spark.createDataFrame(data, ["Date", "Store", "Sales"])
df = df.withColumn("Date", to_date(df.Date, 'yyyy-MM-dd'))
windowSpec = Window.partitionBy("Store").orderBy("Date")
df = df.withColumn("Lag_Sales", lag(df["Sales"]).over(windowSpec))
df = df.withColumn("Lead_Sales", lead(df["Sales"]).over(windowSpec))

df.show()

+----------+------+-----+---------+----------+
|      Date| Store|Sales|Lag_Sales|Lead_Sales|
+----------+------+-----+---------+----------+
|2023-01-01|Store1|  100|     NULL|       150|
|2023-01-02|Store1|  150|      100|       200|
|2023-01-03|Store1|  200|      150|       250|
|2023-01-04|Store1|  250|      200|       300|
|2023-01-05|Store1|  300|      250|      NULL|
|2023-01-01|Store2|   50|     NULL|        60|
|2023-01-02|Store2|   60|       50|        80|
|2023-01-03|Store2|   80|       60|        90|
|2023-01-04|Store2|   90|       80|       120|
|2023-01-05|Store2|  120|       90|      NULL|
+----------+------+-----+---------+----------+



45. How to get the frequency of unique values in the entire dataframe?


In [45]:
data = [(1, 2, 3),
(2, 3, 4),
(1, 2, 3),
(4, 5, 6),
(2, 3, 4)]
df = spark.createDataFrame(data, ["Column1", "Column2", "Column3"])
columns = df.columns
df_single = None

for c in columns:
    if df_single is None:
        df_single = df.select(col(c).alias("single_column"))
    else:
        df_single = df_single.union(df.select(col(c).alias("single_column")))
frequency_table = df_single.groupBy("single_column").count().orderBy('count', ascending=False)
frequency_table.show()

+-------------+-----+
|single_column|count|
+-------------+-----+
|            2|    4|
|            3|    4|
|            4|    3|
|            1|    2|
|            5|    1|
|            6|    1|
+-------------+-----+



                                                                                

46. How to replace both the diagonals of dataframe with 0?

In [46]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.functions import when, col
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(1, 2, 3, 4),
(4, 5, 6, 7)]

df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])
w = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("id", row_number().over(w) - 1)
df = df.select([when(col("id") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])
df = df.withColumn("id", row_number().over(w) - 1)
df = df.withColumn("id_2", df.count() - 1 - df["id"])
dfdia = df.select([when(col("id_2") == i, 0).otherwise(col("col_"+str(i+1))).alias("col_"+str(i+1)) for i in range(4)])
dfdia.show()

24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    0|    2|    3|    0|
|    2|    0|    0|    5|
|    1|    0|    0|    4|
|    0|    5|    6|    0|
+-----+-----+-----+-----+



24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


47. How to reverse the rows of a dataframe?

In [47]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
data = [(1, 2, 3, 4),
(2, 3, 4, 5),
(3, 4, 5, 6),
(4, 5, 6, 7)]
df = spark.createDataFrame(data, ["col_1", "col_2", "col_3", "col_4"])
w = Window.orderBy(monotonically_increasing_id())
df = df.withColumn("id", row_number().over(w) - 1)
df2 = df.orderBy("id", ascending=False).drop("id")

df2.show()

24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/24 11:45:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|    4|    5|    6|    7|
|    3|    4|    5|    6|
|    2|    3|    4|    5|
|    1|    2|    3|    4|
+-----+-----+-----+-----+



48. How to create one-hot encodings of a categorical variable (dummy variables)?

In [50]:


from pyspark.ml.feature import StringIndexer, OneHotEncoder
data = [("A", 10),("A", 20),("B", 30),("B", 20),("B", 30),("C", 40),("C", 10),("D", 10)]
df = spark.createDataFrame(data, ["Categories", "Value"])
indexer = StringIndexer(inputCol="Categories", outputCol="Categories_Indexed")
indexerModel = indexer.fit(df).transform(df)


encoder = OneHotEncoder(inputCol="Categories_Indexed", outputCol="Categories_onehot")
encoded_df = encoder.fit(indexerModel).transform(indexerModel)

encoded_df.show(truncate=False)

+----------+-----+------------------+-----------------+
|Categories|Value|Categories_Indexed|Categories_onehot|
+----------+-----+------------------+-----------------+
|A         |10   |1.0               |(3,[1],[1.0])    |
|A         |20   |1.0               |(3,[1],[1.0])    |
|B         |30   |0.0               |(3,[0],[1.0])    |
|B         |20   |0.0               |(3,[0],[1.0])    |
|B         |30   |0.0               |(3,[0],[1.0])    |
|C         |40   |2.0               |(3,[2],[1.0])    |
|C         |10   |2.0               |(3,[2],[1.0])    |
|D         |10   |3.0               |(3,[],[])        |
+----------+-----+------------------+-----------------+



49. How to Pivot the dataframe (converting rows into columns) ?

In [76]:
data = [
(2021, 1, "US", 5000),
(2021, 1, "EU", 4000),
(2021, 2, "US", 5500),
(2021, 2, "EU", 4500),
(2021, 3, "US", 6000),
(2021, 3, "EU", 5000),
(2021, 4, "US", 7000),
(2021, 4, "EU", 6000),
]
columns = ["year", "quarter", "region", "revenue"]
df = spark.createDataFrame(data, columns)
pivot_df = df.groupBy("year", "quarter").pivot("region").sum("revenue")

pivot_df.show()

+----+-------+----+----+
|year|quarter|  EU|  US|
+----+-------+----+----+
|2021|      2|4500|5500|
|2021|      1|4000|5000|
|2021|      3|5000|6000|
|2021|      4|6000|7000|
+----+-------+----+----+



50. How to UnPivot the dataframe (converting columns into rows) ?

In [77]:
data = [(2021, 2, 4500, 5500),
(2021, 1, 4000, 5000),
(2021, 3, 5000, 6000),
(2021, 4, 6000, 7000)]
columns = ["year", "quarter", "EU", "US"]
df = spark.createDataFrame(data, columns)
unpivotExpr = "stack(2, 'EU',EU, 'US', US) as (region,revenue)"
unPivotDF = pivot_df.select("year","quarter", expr(unpivotExpr)).where("revenue is not null")
unPivotDF.show()

+----+-------+------+-------+
|year|quarter|region|revenue|
+----+-------+------+-------+
|2021|      2|    EU|   4500|
|2021|      2|    US|   5500|
|2021|      1|    EU|   4000|
|2021|      1|    US|   5000|
|2021|      3|    EU|   5000|
|2021|      3|    US|   6000|
|2021|      4|    EU|   6000|
|2021|      4|    US|   7000|
+----+-------+------+-------+



24/09/24 11:18:26 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1714533 ms exceeds timeout 120000 ms
24/09/24 11:18:26 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/24 11:18:32 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

51. How to impute missing values with Zero?

In [51]:
df = spark.createDataFrame([(1, None), (None, 2), (3, 4), (5, None)], ["a", "b"])
df2=df.fillna(0)
df2.show()

+---+---+
|  a|  b|
+---+---+
|  1|  0|
|  0|  2|
|  3|  4|
|  5|  0|
+---+---+



52. How to identify continuous variables in a dataframe and create a list of those column names?

In [54]:
from pyspark.sql.types import DoubleType, FloatType
url = "https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling_m.csv"
spark.sparkContext.addFile(url)

df = spark.read.csv(SparkFiles.get("Churn_Modelling_m.csv"), header=True, inferSchema=True)
cont_cols = [col for col, dtype in df.dtypes if dtype in (DoubleType, FloatType)]
df.show(2, truncate=False)


+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname |CreditScore|Geography|Gender|Age|Tenure|Balance |NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|1        |15634602  |Hargrave|619        |France   |Female|42 |2     |0.0     |1            |1        |1             |101348.88      |1     |
|2        |15647311  |Hill    |608        |Spain    |Female|41 |1     |83807.86|1            |0        |1             |112542.58      |0     |
+---------+----------+--------+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
only showing top 2 rows



24/09/24 11:54:38 WARN SparkContext: The path https://raw.githubusercontent.com/selva86/datasets/master/Churn_Modelling_m.csv has been added already. Overwriting of added paths is not supported in the current version.


54. How to find installed location of Apache Spark and PySpark?

In [55]:
import pyspark
print(pyspark.__file__)

/home/ai/.local/lib/python3.10/site-packages/pyspark/__init__.py


55. How to convert a column to lower case using UDF?

In [61]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]

df = spark.createDataFrame(data, ['Name', 'City'])
def to_lc(s):
    if s is not None:
        return s.lower()
udf_to_lc = udf(to_lower, StringType())
df = df.withColumn('city_lc', udf_to_lc(df['City']))

df.show()

+------------+-------------+-------------+
|        Name|         City|      city_lc|
+------------+-------------+-------------+
|    John Doe|     NEW YORK|     new york|
|    Jane Doe|  LOS ANGELES|  los angeles|
|Mike Johnson|      CHICAGO|      chicago|
|  Sara Smith|SAN FRANCISCO|san francisco|
+------------+-------------+-------------+



56. How to convert PySpark data frame to pandas dataframe?

In [64]:
data = [('John Doe', 'NEW YORK'),
('Jane Doe', 'LOS ANGELES'),
('Mike Johnson', 'CHICAGO'),
('Sara Smith', 'SAN FRANCISCO')]
pysparkDF = spark.createDataFrame(data, ['Name', 'City'])
pandasdf= pysparkDF.toPandas()
pandasdf

Unnamed: 0,Name,City
0,John Doe,NEW YORK
1,Jane Doe,LOS ANGELES
2,Mike Johnson,CHICAGO
3,Sara Smith,SAN FRANCISCO


57. How to View PySpark Cluster Details?

In [65]:
print(spark.sparkContext.uiWebUrl)

http://192.168.1.165:4040


58. How to View PySpark Cluster Configuration Details?

In [67]:
for k,v in spark.sparkContext.getConf().getAll():
    print(f"{k} : {v}")

spark.driver.extraJavaOptions : -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.executor.id : driver
spark.app.startTime : 1727158516384
spark.app.name : PySparkex
spark.app.id : local-1

59. How to restrict the PySpark to use the number of cores in the system?

In [73]:
from pyspark.conf import SparkConf

conf = SparkConf().set("spark.cores.max", 4)
sc = SparkContext.getOrCreate(conf=conf)

60. How to cache PySpark DataFrame or objects and delete cache?

In [77]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Cache Example").getOrCreate()
df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["id", "value"])
df.cache()
df.count()
df.collect()

[Row(id=1, value='a'), Row(id=2, value='b'), Row(id=3, value='c')]

61. How to Divide a PySpark DataFrame randomly in a given ratio (0.8, 0.2)?

In [78]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Random Split").getOrCreate()
df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")], ["id", "value"])
df_train, df_test = df.randomSplit([0.8, 0.2], seed=42)

print("Training DataFrame:")
print(df_train.count())
print(df_train.show())

print("Testing DataFrame:")
print(df_test.count())
print(df_test.show())

24/09/24 12:21:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Training DataFrame:
4
+---+-----+
| id|value|
+---+-----+
|  2|    b|
|  3|    c|
|  4|    d|
|  5|    e|
+---+-----+

None
Testing DataFrame:
1
+---+-----+
| id|value|
+---+-----+
|  1|    a|
+---+-----+

None


62. How to build logistic regression in PySpark?

63. How to convert the categorical string data into numerical data or index?

In [85]:
data = [('cat',), ('dog',), ('mouse',), ('fish',), ('dog',), ('cat',), ('mouse',)]
df = spark.createDataFrame(data, ["animal"])
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='animal', outputCol='animalIndex')
indexed = indexer.fit(df).transform(df)
indexed.show()


+------+-----------+
|animal|animalIndex|
+------+-----------+
|   cat|        0.0|
|   dog|        1.0|
| mouse|        2.0|
|  fish|        3.0|
|   dog|        1.0|
|   cat|        0.0|
| mouse|        2.0|
+------+-----------+




64. How to calculate Correlation of two variables in a DataFrame?

In [86]:
data = [Row(feature1=5, feature2=10, feature3=25),
Row(feature1=6, feature2=15, feature3=35),
Row(feature1=7, feature2=25, feature3=30),
Row(feature1=8, feature2=20, feature3=60),
Row(feature1=9, feature2=30, feature3=70)]
df = spark.createDataFrame(data)
correlation = df.corr("feature1", "feature2")

print("Correlation between feature1 and feature2 :", correlation)

Correlation between feature1 and feature2 : 0.9


65. How to calculate Correlation Matrix?

In [114]:
from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.appName("Correlation Matrix").getOrCreate()

# Create a sample DataFrame
data = [
    (1.0, 2.0, 3.0),
    (4.0, 5.0, 6.0),
    (7.0, 8.0, 9.0),
    (10.0, 11.0, 12.0)
]

df = spark.createDataFrame(data, ["feature1", "feature2", "feature3"])

# Create a VectorAssembler to combine the feature columns
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")

# Transform the DataFrame to include the new features column
df_assembled = assembler.transform(df)

# Calculate the correlation matrix
corr_matrix = Correlation.corr(df_assembled, "features").collect()[0]

# Print the correlation matrix
print("Correlation Matrix:")
print(corr_matrix)

# Stop the SparkSession
spark.stop()

                                                                                

Correlation Matrix:
Row(pearson(features)=DenseMatrix(3, 3, [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], False))


66. How to calculate VIF (Variance Inflation Factor ) for set of variables in a DataFrame?

67. How to perform Chi-Square test?

In [102]:
from pyspark.ml.feature import VectorAssembler
data = [(1, 0, 0, 1, 1),
(2, 0, 1, 0, 0),
(3, 1, 0, 0, 0),
(4, 0, 0, 1, 1),
(5, 0, 1, 1, 0)]

df = spark.createDataFrame(data, ["id", "feature1", "feature2", "feature3", "label"])
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3"], outputCol="features")
df = assembler.transform(df)
from pyspark.ml.stat import ChiSquareTest
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

pValues: [0.36131042852617856,0.13603712811414348,0.1360371281141436]
degreesOfFreedom: [1, 1, 1]
statistics: [0.8333333333333335,2.2222222222222228,2.2222222222222223]


68. How to calculate the Standard Deviation?

In [101]:
data = [("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)]
df = spark.createDataFrame(data, ["Employee", "Department", "Salary"])
from pyspark.sql.functions import stddev
salary_stddev = df.select(stddev("Salary").alias("stddev"))
salary_stddev.show()


+-----------------+
|           stddev|
+-----------------+
|765.9416862050705|
+-----------------+



69. How to calculate missing value percentage in each column?

In [98]:
data = [("John", "Doe", None),
(None, "Smith", "New York"),
("Mike", "Smith", None),
("Anna", "Smith", "Boston"),
(None, None, None)]
df = spark.createDataFrame(data, ["FirstName", "LastName", "City"])
total_rows = df.count()
for column in df.columns:
    null_values = df.filter(df[column].isNull()).count()
    missing_percentage = (null_values / total_rows) * 100
    print(f"Missing values in {column}: {missing_percentage}%")

Missing values in FirstName: 40.0%
Missing values in LastName: 20.0%
Missing values in City: 60.0%


70. How to get the names of DataFrame objects that have been created in an environment?

In [100]:
dataframe_names = [name for name, obj in globals().items() if isinstance(obj, pyspark.sql.DataFrame)]

for name in dataframe_names:
    print(name)

df
df_with_index
res
df1
binned_df
df_filtered
df_A
df_B
df_first_diff
df_second_diff
df2
pivot_df
missing
sumdf
fdf
df_pu
df_single
frequency_table
dfdia
indexerModel
encoded_df
pysparkDF
df_train
df_test
indexed
salary_stddev
