In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, year, to_date, broadcast, lit, split, when, max,min, to_timestamp, avg
import findspark
findspark.init()
# create a SparkSession
spark = SparkSession.builder.appName("PlayStoreAppData").getOrCreate()


In [2]:
# load a CSV file into a DataFrame
df = spark.read.csv("Data/google-play-dataset-by-tapivedotcom.csv",  header=True)


In [3]:
#Show list of columns name
print(df.columns)

['_c0', 'appId', 'developer', 'developerId', 'developerWebsite', 'free', 'genre', 'genreId', 'inAppProductPrice', 'minInstalls', 'offersIAP', 'originalPrice', 'price', 'ratings', 'len screenshots', 'adSupported', 'containsAds', 'reviews', 'releasedDayYear', 'sale', 'score', 'summary', 'title', 'updated', 'histogram1', 'histogram2', 'histogram3', 'histogram4', 'histogram5', 'releasedDay', 'releasedYear', 'releasedMonth', 'dateUpdated', 'minprice', 'maxprice', 'ParseReleasedDayYear']


In [4]:
#Rename the index column
df = df.withColumnRenamed("_c0", "Index")
print(df.columns)

['Index', 'appId', 'developer', 'developerId', 'developerWebsite', 'free', 'genre', 'genreId', 'inAppProductPrice', 'minInstalls', 'offersIAP', 'originalPrice', 'price', 'ratings', 'len screenshots', 'adSupported', 'containsAds', 'reviews', 'releasedDayYear', 'sale', 'score', 'summary', 'title', 'updated', 'histogram1', 'histogram2', 'histogram3', 'histogram4', 'histogram5', 'releasedDay', 'releasedYear', 'releasedMonth', 'dateUpdated', 'minprice', 'maxprice', 'ParseReleasedDayYear']


In [5]:
df.printSchema()

root
 |-- Index: string (nullable = true)
 |-- appId: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- developerId: string (nullable = true)
 |-- developerWebsite: string (nullable = true)
 |-- free: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- genreId: string (nullable = true)
 |-- inAppProductPrice: string (nullable = true)
 |-- minInstalls: string (nullable = true)
 |-- offersIAP: string (nullable = true)
 |-- originalPrice: string (nullable = true)
 |-- price: string (nullable = true)
 |-- ratings: string (nullable = true)
 |-- len screenshots: string (nullable = true)
 |-- adSupported: string (nullable = true)
 |-- containsAds: string (nullable = true)
 |-- reviews: string (nullable = true)
 |-- releasedDayYear: string (nullable = true)
 |-- sale: string (nullable = true)
 |-- score: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- title: string (nullable = true)
 |-- updated: string (nullable = true)
 |-- histogram1:

    The above cell output shows that fields like price, rating, minprice, maxprice, minInstalls etc
    are string we need to convert them to there required datatype like price, score,rating,minprice 
    etc should be converted to float or int, ParseReleasedDayYear and dateUpdated should be converted 
    to timestamp

In [6]:
# Casting column datatype to sutiable datatype
#Float
df = df.withColumn("price", col("price").cast("float"))
df = df.withColumn("score", col("score").cast("float"))
df = df.withColumn("minprice", col("minprice").cast("float"))
df = df.withColumn("maxprice", col("maxprice").cast("float"))
#Int
df = df.withColumn("free", col("free").cast("int"))
df = df.withColumn("ratings", col("ratings").cast("int"))
df = df.withColumn("minInstalls", col("minInstalls").cast("int"))
df = df.withColumn("adSupported", col("adSupported").cast("int"))
df = df.withColumn("containsAds", col("containsAds").cast("int"))
df = df.withColumn("reviews", col("reviews").cast("int"))
df = df.withColumn("releasedYear", col("releasedYear").cast("int"))


# Parse ReleasedDayYear column to timestamp and date (assuming format "yyyy-MM-dd")
df = df.withColumn("ParseReleasedDayYear", to_timestamp(col("ParseReleasedDayYear")))
df = df.withColumn("dateUpdated", to_timestamp(col("dateUpdated")))

In [7]:
#Print Schema to validate chages
df.printSchema()

root
 |-- Index: string (nullable = true)
 |-- appId: string (nullable = true)
 |-- developer: string (nullable = true)
 |-- developerId: string (nullable = true)
 |-- developerWebsite: string (nullable = true)
 |-- free: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- genreId: string (nullable = true)
 |-- inAppProductPrice: string (nullable = true)
 |-- minInstalls: integer (nullable = true)
 |-- offersIAP: string (nullable = true)
 |-- originalPrice: string (nullable = true)
 |-- price: float (nullable = true)
 |-- ratings: integer (nullable = true)
 |-- len screenshots: string (nullable = true)
 |-- adSupported: integer (nullable = true)
 |-- containsAds: integer (nullable = true)
 |-- reviews: integer (nullable = true)
 |-- releasedDayYear: string (nullable = true)
 |-- sale: string (nullable = true)
 |-- score: float (nullable = true)
 |-- summary: string (nullable = true)
 |-- title: string (nullable = true)
 |-- updated: string (nullable = true)
 |-- histogr

In [8]:
filtered_df = df.withColumn(
    "releasedyear_column",
    when(
        (col("releasedYear") >= 2000) & (col("releasedYear") <= 2023),
        col("releasedYear")
    ).otherwise(None)
)


In [9]:
filtered_df.select("releasedyear_column").count()


3460966

In [10]:
max_filtered_date = filtered_df.agg({"minInstalls": "max"}).collect()[0]["max(minInstalls)"]
min_filtered_date = filtered_df.agg({"minInstalls": "min"}).collect()[0]["min(minInstalls)"]

In [11]:
max_filtered_date

1000000000

In [12]:
min_filtered_date

0

In [13]:
bucket_df = filtered_df

In [14]:
#Bucketing the data
bucket_df = bucket_df.withColumn(
    "year_bucket",
    when((col("releasedyear_column") >= 2000) & (col("releasedyear_column") <= 2005), "[2000-2005]")
    .when((col("releasedyear_column") >= 2006) & (col("releasedyear_column") <= 2010), "[2006-2010]")
    .when((col("releasedyear_column") >= 2011) & (col("releasedyear_column") <= 2015), "[2011-2015]")
    .when((col("releasedyear_column") >= 2016) & (col("releasedyear_column") <= 2020), "[2016-2020]")
    .when((col("releasedyear_column") >= 2021) & (col("releasedyear_column") <= 2025), "[2021-2025]")
    .otherwise(None)
)

bucket_df = bucket_df.withColumn(
    "rating_bucket",
    when((col("ratings") >= 0) & (col("ratings") <= 1), "[rating between 0 to 1]")
    .when((col("ratings") >= 1.1) & (col("ratings") <= 2), "[rating between 1 to 2]")
    .when((col("ratings") >= 2.1) & (col("ratings") <= 3), "[rating between 2 to 3]")
    .when((col("ratings") >= 3.1) & (col("ratings") <= 4), "[rating between 3 to 4]")
    .when((col("ratings") >= 4.1) & (col("ratings") <= 5), "[rating between 4 to 5]")
    .otherwise(None)
)

bucket_df = bucket_df.withColumn(
    "price_bucket",
    when((col("price") >= 0) & (col("price") <= 5), "[price between 0 to 5]")
    .when((col("price") > 5) & (col("price") <= 10), "[price between 5 or 10]")
    .when((col("price") > 10) & (col("price") <= 15), "[price between 10 or 15]")
    .when((col("price") > 15) & (col("price") <= 20), "[price between 15 or 20]")
    .when((col("price") >20) , "[price above 20]")
    .otherwise(None)
)

bucket_df = bucket_df.withColumn(
    "minInstalls_bucket",
    when((col("minInstalls") >= 0) & (col("minInstalls") <= 1000), "[Install between 0 to 1000]")
    .when((col("minInstalls") >= 1001) & (col("minInstalls") <= 10000), "[Install between 1001 or 10000]")
    .when((col("minInstalls") >= 10001) & (col("minInstalls") <= 50000), "[Install between 10001 or 50000]")
    .when((col("minInstalls") >= 50001) & (col("minInstalls") <= 100000), "[Install between 50001 or 100000]")
    .when((col("minInstalls") >= 100001) & (col("minInstalls") <= 1000000), "[Install between 100001 or 1000000]")
    .when((col("minInstalls") >= 1000001) , "[Install above 1000001]")
    .otherwise(None)
)

In [15]:
bucket_df.count()

3460966

In [16]:
year_bucket_count = bucket_df.groupBy("year_bucket").count()
rating_bucket_count = bucket_df.groupBy("rating_bucket").count()
price_bucket_count = bucket_df.groupBy("price_bucket").count()
install_bucket_count = bucket_df.groupBy("minInstalls_bucket").count()

In [17]:
year_bucket_count.show()
rating_bucket_count.show()
price_bucket_count.show()
install_bucket_count.show()

+-----------+-------+
|year_bucket|  count|
+-----------+-------+
|       null|  16288|
|[2016-2020]|1873870|
|[2006-2010]|   4820|
|[2021-2025]|1302342|
|[2011-2015]| 263646|
+-----------+-------+

+--------------------+-------+
|       rating_bucket|  count|
+--------------------+-------+
|[rating between 0...|3005099|
|                null| 450854|
|[rating between 4...|   5002|
|[rating between 2...|      3|
|[rating between 3...|      1|
|[rating between 1...|      7|
+--------------------+-------+

+--------------------+-------+
|        price_bucket|  count|
+--------------------+-------+
|[price between 0 ...|3432542|
|                null|   9628|
|[price between 5 ...|  11212|
|[price between 15...|   1270|
|    [price above 20]|   4100|
|[price between 10...|   2214|
+--------------------+-------+

+--------------------+-------+
|  minInstalls_bucket|  count|
+--------------------+-------+
|                null|    416|
|[Install between ...|2692925|
|[Install between ...|  

In [18]:
#Filter out combinations which are smaller than 2% of total volume.
filter_bucket_count = int(0.02 * bucket_df.count())
print(filter_bucket_count)
filtered_year_bucket = year_bucket_count.filter(col("count") > filter_bucket_count)
filtered_rating_bucket = rating_bucket_count.filter(col("count") > filter_bucket_count)
filtered_price_bucket = price_bucket_count.filter(col("count") > filter_bucket_count)
filtered_install_bucket = install_bucket_count.filter(col("count") > filter_bucket_count)

rating_list = filtered_rating_bucket.select("rating_bucket").rdd.flatMap(lambda x: x).collect()
price_list = filtered_price_bucket.select("price_bucket").rdd.flatMap(lambda x: x).collect()
year_list = filtered_year_bucket.select("year_bucket").rdd.flatMap(lambda x: x).collect()
install_list = filtered_install_bucket.select("minInstalls_bucket").rdd.flatMap(lambda x: x).collect()

print(rating_list)
print(price_list)
print(year_list)
print(install_list)




69219
['[rating between 0 to 1]', None]
['[price between 0 to 5]']
['[2016-2020]', '[2021-2025]', '[2011-2015]']
['[Install between 0 to 1000]', '[Install between 1001 or 10000]', '[Install between 50001 or 100000]', '[Install between 10001 or 50000]']


In [19]:

# Filter the DataFrame
Cleaned_df = bucket_df.filter((col("year_bucket").isin(year_list)) &
                        (col("rating_bucket").isin(rating_list)) &
                        (col("price_bucket").isin(price_list)) &
                        (col("minInstalls_bucket").isin(install_list)))

# Show the filtered DataFrame
Cleaned_df.count()

2976568

In [42]:
from itertools import combinations

# List of important columns
important_columns = ["year_bucket", "rating_bucket", "price_bucket", "genre", "minInstalls_bucket","price",
                    "ratings", "developerWebsite", "free", "reviews", "score" ,"appId"]

# Generate all possible combinations of 1 to 12 columns
all_column_combinations = [c for r in range(1, 13) for c in combinations(important_columns, r)]

# Iterate through each combination and create a new DataFrame
for column_combination in all_column_combinations:
    selected_columns_df = Cleaned_df.select(*column_combination)

selected_columns_df.show(5)


+-----------+--------------------+--------------------+------------+--------------------+-----+-------+--------------------+----+-------+-----+--------------------+
|year_bucket|       rating_bucket|        price_bucket|       genre|  minInstalls_bucket|price|ratings|    developerWebsite|free|reviews|score|               appId|
+-----------+--------------------+--------------------+------------+--------------------+-----+-------+--------------------+----+-------+-----+--------------------+
|[2016-2020]|[rating between 0...|[price between 0 ...|Art & Design|[Install between ...|  0.0|      0|                None|   1|      0|  0.0|freeappscollectio...|
|[2016-2020]|[rating between 0...|[price between 0 ...|Art & Design|[Install between ...|  0.0|      0|https://dailyshar...|   1|      0|  0.0|in.letstartup.nai...|
|[2016-2020]|[rating between 0...|[price between 0 ...|Art & Design|[Install between ...|  0.0|      0|                None|   1|      0|  0.0|com.inforotor.saa...|
|[2016-202

In [43]:
#Question 1: How many apps are free, belong to a certain genre, and were launched in a particular year?

Answer_one = selected_columns_df.filter((col('free') == 1) & (col('genre') == 'Art & Design'))
Answer_one = Answer_one.select('appId','genre','year_bucket')
Answer_one.coalesce(1).write.csv('output/answer1.csv', header=True, mode="overwrite")


In [48]:
#Question 2: How many apps fall within a 0 to 5 price range, were released in 
# between 2011-2015, have Instalation between 50001 or 100000?

Answer2 = selected_columns_df.filter(
    (col('price_bucket') == '[price between 0 to 5]' ) &
    (col('year_bucket') == '[2011-2015]') &
    (col('minInstalls_bucket') == '[Install between 50001 or 100000]') 
)
Answer_Two = Answer2.select('appId','year_bucket','minInstalls_bucket')
Answer_Two.coalesce(1).write.csv('output/answer2.csv', header=True, mode="overwrite")


In [49]:
#Question 3: What is the average rating of apps for each genre?
average_ratings_per_genre = selected_columns_df.groupBy('genre').agg(avg('ratings').alias('average_rating'))
Answer_Three = average_ratings_per_genre.select('genre','average_rating')
Answer_Three.coalesce(1).write.csv('output/answer3.csv', header=True, mode="overwrite")
