In [0]:
from pyspark.sql.functions import regexp_replace, length,expr,lit,coalesce,count,avg,max,col,sum


In [0]:
wp_path = '/mnt/deBDProject/BI_Users/WorldPopulation.json'
world_population = spark.read.json(wp_path)
world_population = world_population.withColumn('CountryUnderScore', regexp_replace('Country', ' ', '_'))
# world_population.show(5)

In [0]:

users_path = '/mnt/deBDProject/ml_training_project/Users.csv'
users = spark.read.csv(users_path,header=True)

locations_cleaned = users.select("id", "displayname","location","reputation","views","upvotes","downvotes")
locations_cleaned= locations_cleaned.withColumn("LocationCommaCount" ,length(locations_cleaned["location"]) - length(regexp_replace(locations_cleaned["location"], ",", "")))

# locations_cleaned.show(10)


In [0]:
#Retrieve Countries from column and put in one list
locationsOneWord = locations_cleaned[locations_cleaned["locationCommaCount"]==0]
locationsTwoWords = locations_cleaned[locations_cleaned["locationCommaCount"]==1.0]
locationsTwoWords = locationsTwoWords.withColumn("location",expr("split(location,',')[1]"))
locationsThreeWords = locations_cleaned[locations_cleaned["locationCommaCount"]==2.0]
locationsThreeWords = locationsThreeWords.withColumn("location",expr("split(location,',')[2]"))
locationsFourWords = locations_cleaned[locations_cleaned["locationCommaCount"]==3.0]
locationsFourWords = locationsFourWords.withColumn("location",expr("split(location,',')[3]"))
locationsNA = locations_cleaned[locations_cleaned["locationCommaCount"].isNull() ]
locationsNA = locationsNA.withColumn("location", lit("N/A"))
locations_Countries = locationsOneWord.union(locationsTwoWords)\
                        .union(locationsThreeWords)\
                        .union(locationsFourWords)\
                        .union(locationsNA)

locations_Countries = locations_Countries.withColumnRenamed("location","Country_Name")


In [0]:
#Clean Up Records in the Location Column using external file that contains rules

countries_to_replace_path = '/mnt/deBDProject/BI_Users/countries_to_rename.csv'
countries_to_replace = spark.read.csv(countries_to_replace_path,header=True)

for index, row in countries_to_replace.toPandas().iterrows():
    locations_Countries =locations_Countries.withColumn("Country_Name", regexp_replace('Country_Name',row[0], row[1]))


In [0]:
locations_Countries = locations_Countries.withColumn("CountryUnderScore", regexp_replace("Country_Name"," ","_"))

In [0]:
final_df = locations_Countries.join(world_population, locations_Countries["CountryUnderScore"] == world_population["CountryUnderScore"], "left").fillna({'Population': 0, 'Country': 'N/A'})

final_df = final_df.select(final_df["Country"].alias("Country_Name"),final_df["Id"],final_df["reputation"], "Population")

final_df.show(10)

+--------------+----------+----------+
|  Country_Name|reputation|Population|
+--------------+----------+----------+
|           N/A|        21|         0|
|           N/A|        16|         0|
|United Kingdom|       774|  67736802|
|           N/A|         1|         0|
|           N/A|         1|         0|
| United States|       360| 339996564|
|           N/A|         1|         0|
|           N/A|        21|         0|
|           N/A|       384|         0|
|           N/A|        13|         0|
+--------------+----------+----------+
only showing top 10 rows



In [0]:

grouped_df = final_df.groupBy("Country_Name").agg(count(final_df.Country_Name).alias("User_Count"),\
                                sum(final_df.reputation).alias("User_Reputation"),\
                                max(final_df.Population).alias("Population")
                                )


In [0]:
# grouped_df.show(200)

+--------------------+----------+---------------+----------+
|        Country_Name|User_Count|User_Reputation|Population|
+--------------------+----------+---------------+----------+
|         Afghanistan|        27|         5246.0|  42239854|
|             Albania|        64|        50165.0|   2832439|
|             Algeria|       133|        42612.0|  45606481|
|             Andorra|        11|        55736.0|     80088|
|              Angola|         8|         1703.0|  36684203|
| Antigua and Barbuda|         1|           36.0|     94298|
|           Argentina|      1087|       751519.0|  45773884|
|             Armenia|       128|       159211.0|   2777971|
|           Australia|      3136|      4376135.0|  26439112|
|             Austria|      1126|      1270759.0|   8958961|
|          Azerbaijan|        78|        17820.0|  10412652|
|             Bahamas|         5|          127.0|    412624|
|             Bahrain|        91|        38768.0|   1485510|
|          Bangladesh|  

In [0]:

grouped_df.write.mode("overwrite")\
    .option("header",True)\
    .csv('/mnt/deBDProject/BI_Users/UserGeolocation')

In [0]:
from pyspark.sql import Window
import pyspark.sql.functions as F

detailed_df = locations_Countries.select("Country_Name","CountryUnderScore","id","displayname","reputation","views","upvotes","downvotes").join(world_population, locations_Countries["CountryUnderScore"] == world_population["CountryUnderScore"], "left").fillna({'Population': 0, 'Country': 'N/A'})

detailed_non_null_df = detailed_df[detailed_df["Country"] != 'N/A']

# detailed_non_null_df.show(15)


+--------------+-----------------+------+--------------------+----------+-----+-------+---------+--------------+----------+-----------------+
|  Country_Name|CountryUnderScore|    id|         displayname|reputation|views|upvotes|downvotes|       Country|Population|CountryUnderScore|
+--------------+-----------------+------+--------------------+----------+-----+-------+---------+--------------+----------+-----------------+
|         India|            India|518997|             Nishant|       407|   85|     26|        0|         India|1428627663|            India|
|        Israel|           Israel|519002|                 Ron|      1070|  569|    181|        3|        Israel|   9174520|           Israel|
|     Argentina|        Argentina|519036|              Emilio|         6|    7|      0|        0|     Argentina|  45773884|        Argentina|
| United States|    United_States|519037|            Valdis R|      1504|   79|    129|        4| United States| 339996564|    United_States|
|     

In [0]:
detailed_partitioned_df = detailed_non_null_df.select("Country_Name","id","displayname","reputation","views","upvotes","downvotes")\
    .withColumn('rank', F.dense_rank().over(Window.partitionBy("Country_Name").orderBy(col('reputation').cast("int").desc())))\
    .filter(F.col('rank')<=5)\

# detailed_partitioned_df.show(20)

+------------+-------+---------------+----------+-----+-------+---------+----+
|Country_Name|     id|    displayname|reputation|views|upvotes|downvotes|rank|
+------------+-------+---------------+----------+-----+-------+---------+----+
| Afghanistan|2169355|  Mustafa Ehsan|      1969|  175|    173|        5|   1|
| Afghanistan|2198400|        Fshamri|       565|   52|     24|        1|   2|
| Afghanistan|3129780|Hanzallah Afgan|       546|  185|     87|       15|   3|
| Afghanistan|2110828|  Ajmal Amirzad|       474|   64|     11|        8|   4|
| Afghanistan|1746091|    ali shekari|       440|   97|    165|        4|   5|
|     Albania| 645186|           Shef|     36150| 2505|   1391|      309|   1|
|     Albania|2736039|       Ultimo_m|      2690|  385|   2265|       26|   2|
|     Albania|2212128| Freeman Lambda|      2167|  230|    408|       32|   3|
|     Albania|1973164|            TGO|      1819|  366|     65|        5|   4|
|     Albania| 930943|           Hari|      1345|  3

In [0]:
detailed_partitioned_df.write.mode("overwrite")\
    .option("header",True)\
    .csv('/mnt/deBDProject/BI_Users/TopUsersPerCountry')