In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [2]:
import pyspark
import pyspark.sql  as pyspark_sql
import pyspark.sql.types as pyspark_types
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *



In [3]:
# create the session
conf = SparkConf()

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = pyspark_sql.SparkSession.builder.getOrCreate()

In [5]:
# from google.colab import drive
# drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
df_full=spark.read.format("csv").option("header",True).option("inferSchema",True).option("multiLine",True).option("escapeQuotes",True).option("escape", "\"").load("playstore.csv")

In [None]:
df_full.orderBy("_c0",ascending=False).show()

In [None]:
df_full.printSchema()

In [169]:
df=df_full

In [59]:
# Calculate total volume
total_volume = df.count()
print(total_volume)

3451063


In [122]:
# Binning function for numerical fields
def bin_numerical_field(field, bin_size):
    return expr(f"floor({field}/{bin_size})*{bin_size}")

In [155]:
def calculate_bin_size(df, numerical_field, num_of_bins = 10):
    # Find the range of values in the numerical field
    max_value = df.agg(ceil(max(col(numerical_field).cast('int'))).alias("max_val")).head().asDict()['max_val']
    min_value = df.agg(floor(min(col(numerical_field).cast('int'))).alias("min_val")).head().asDict()['min_val']

    # Calculate the bin size
    bin_size = (max_value - min_value) // num_of_bins
    # df.withColumn("x", bin_numerical_field("releasedYear", 2)).groupBy("x").agg(count("appId").alias("count")).where(col("count")<=20).show()
    if bin_size>0:
      bin_size=bin_size
    else:
      bin_size=1
    return bin_size

In [197]:
bin_dict={}
bin_dict["minInstalls"] = calculate_bin_size(df, "minInstalls")
bin_dict["price"] = calculate_bin_size(df, "price")
bin_dict["ratings"] = calculate_bin_size(df, "ratings")
bin_dict["reviews"] = calculate_bin_size(df, "reviews")
bin_dict["score"] = calculate_bin_size(df, "score")
bin_dict["releasedYear"] = calculate_bin_size(df, "releasedYear",5)
bin_dict["maxprice"] = calculate_bin_size(df, "maxprice")

minInstalls 1410065408 0 10
price 1100 0 10
ratings 166417449 0 10
reviews 4394018 0 10
score 5 0 10
releasedYear 2022 2009 5
maxprice 1024 0 10
{'minInstalls': 141006540, 'price': 110, 'ratings': 16641744, 'reviews': 439401, 'score': 1, 'releasedYear': 2, 'maxprice': 102}


In [198]:
# df.withColumn("x", bin_numerical_field("minInstalls", 141006540)).withColumn('y',col("x")+1).withColumn("z",concat_ws("-",col("x"),col("y"))).show()

In [199]:
df_test = df
for i in bin_dict.keys():
  df_test = df_test.withColumn(i+"x", bin_numerical_field(i, bin_dict[i])).withColumn(i+'y',col(i+"x")+bin_dict[i]).withColumn(i,concat(lit("["),col(i+"x"),lit("-"),col(i+"y"),lit("]")))
df_binned = df_test.select("appId","sale", "free", "genre", "minInstalls", "offersIAP", "price",  "ratings","adSupported", "containsAds", "reviews", "score","releasedYear")

In [200]:

# Calculate counts for various combinations
result = (
    df_binned.groupBy("sale", "free", "genre", "minInstalls", "offersIAP", "price",  "ratings","adSupported", "containsAds", "reviews", "score","releasedYear")
    .agg(count("appId").alias("count"))
)

In [201]:
result.orderBy('count',ascending=False).show()

+----+----+-----------------+-------------+---------+-------+------------+-----------+-----------+----------+-----+------------+------+
|sale|free|            genre|  minInstalls|offersIAP|  price|     ratings|adSupported|containsAds|   reviews|score|releasedYear| count|
+----+----+-----------------+-------------+---------+-------+------------+-----------+-----------+----------+-----+------------+------+
|   0|   1|        Education|[0-141006540]|        0|[0-110]|[0-16641744]|          0|          0|[0-439401]|[0-1]| [2020-2022]|105607|
|   0|   1|         Business|[0-141006540]|        0|[0-110]|[0-16641744]|          0|          0|[0-439401]|[0-1]| [2020-2022]| 90961|
|   0|   1|         Shopping|[0-141006540]|        0|[0-110]|[0-16641744]|          0|          0|[0-439401]|[0-1]| [2020-2022]| 74014|
|   0|   1|    Music & Audio|[0-141006540]|        0|[0-110]|[0-16641744]|          1|          1|[0-439401]|[0-1]| [2020-2022]| 63625|
|   0|   1|         Business|[0-141006540]|     

In [204]:
result.count()

10081

In [None]:
# # Filter out combinations smaller than 2% of the total volume
# result_filtered = result.filter(col("count") / total_volume >= 0.02)
# result_filtered.count()

In [209]:
interim_result = result
for i in result.columns:
  if i != 'count':
    interim_result=interim_result.withColumn(i,concat(lit(i),lit(':'),col(i)))
interim_result.show()

+------+------+--------------------+--------------------+-----------+-------------+--------------------+-------------+-------------+------------------+-----------+--------------------+-----+
|  sale|  free|               genre|         minInstalls|  offersIAP|        price|             ratings|  adSupported|  containsAds|           reviews|      score|        releasedYear|count|
+------+------+--------------------+--------------------+-----------+-------------+--------------------+-------------+-------------+------------------+-----------+--------------------+-----+
|sale:0|free:1|genre:Auto & Vehi...|minInstalls:[0-14...|offersIAP:0|price:[0-110]|ratings:[0-16641744]|adSupported:1|containsAds:1|reviews:[0-439401]|score:[2-3]|releasedYear:[201...|   17|
|sale:0|free:1|      genre:Business|minInstalls:[0-14...|offersIAP:1|price:[0-110]|ratings:[0-16641744]|adSupported:1|containsAds:1|reviews:[0-439401]|score:[3-4]|releasedYear:[201...|   32|
|sale:0|free:1|  genre:Productivity|minInstal

In [210]:
# Convert the result to a CSV string
result_csv = (
    interim_result.withColumn("name", concat_ws("; ","sale", "free", "genre", "minInstalls", "offersIAP", "price",  "ratings",
                                        "adSupported", "containsAds", "reviews", "score","releasedYear")).select("name","count")
)

In [211]:
result_csv.orderBy(count).show()

+--------------------+-----+
|                name|count|
+--------------------+-----+
|sale:0; free:1; g...|   17|
|sale:0; free:1; g...|   32|
|sale:0; free:1; g...|  813|
|sale:0; free:1; g...|  313|
|sale:0; free:1; g...|  536|
|sale:0; free:1; g...|   25|
|sale:0; free:1; g...|   28|
|sale:0; free:1; g...|  121|
|sale:0; free:1; g...|   52|
|sale:0; free:1; g...|  131|
|sale:0; free:1; g...|  286|
|sale:0; free:0; g...|  105|
|sale:0; free:1; g...|  118|
|sale:0; free:1; g...|   27|
|sale:0; free:0; g...|   55|
|sale:0; free:1; g...|   35|
|sale:0; free:1; g...|   10|
|sale:0; free:1; g...|   51|
|sale:0; free:1; g...|    1|
|sale:0; free:1; g...|    1|
+--------------------+-----+
only showing top 20 rows



In [212]:
result_csv.write.format("csv").option("header",True).option("multiLine",True).save("results.csv")

In [213]:
# Stop the Spark session
spark.stop()