## ASM1 : Analysing "StackOverflow" Users in habitus and behaviour
### Summary:
- Given Data: .CSV files (1-2GB) which store questions and answers by users in various topics in StackOverFlow website.
- Requirement:
    - Store the given data in Mongodb database.
    - Using Apache Spark to process large data from database and calculate Bussiness Queries.

### 1. Import data to MongoDB database
- Using mongoimport (MongDB database tool) to import content from .CSV file to mongodb database, the data will be convert to JSON, using headers as field names.
- Run mongoimport from the system command line, not the mongo shell:
    - mongoimport --type csv --db StackOverFlow --collection Answers --headerline --drop --file `[your path]`/Answers.csv
    - mongoimport --type csv --db StackOverFlow --collection Questions --headerline --drop --file `[your path]`/Questions.csv
- Results:
    - Database: StackOverFlow.
    - Collections: Answers, Questions.

### 2. Read the data from MongoDB, using Spark
- Specifying the Connector configuration via SparkSession.
- Setting a reader connection to mongoDB port, StackOverFlow database.

In [2]:
# Import needed libraries to create a spark session on jupyter-notebook

import os
import findspark

# Link Java11 Home and Spark Home to run Spark Engine API

os.environ["JAVA_HOME"] = "/usr/local/Cellar/openjdk@11/11.0.19"
os.environ["SPARK_HOME"] = "/usr/local/Cellar/apache-spark/3.4.0/libexec"

findspark.init()

# Create a spark session which use "local" machine and all CPU core,
# The session open a reader connect to mongodb localhost, "StackOverFlow" database.

from pyspark.sql import SparkSession, Window
from pyspark import SparkConf
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DoubleType, ArrayType

myConf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('ASM1')

spark = SparkSession \
        .builder \
        .config(conf=myConf) \
        .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/") \
        .config("spark.mongodb.read.database", "StackOverFlow") \
        .getOrCreate()

### 3. Normalize the data
- Read two collections by the reader connection.
- Store collection's data to the corresponding dataframe.
- Correcting the data type and convert N/A data to null value.
- Final data store in:
    - questionsDF
    - answersDF

In [3]:
# Using the reader connection, create dataframes from reading mongodb collections, change the data to appropriate type. 

questionsRawDF = spark.read.format("mongodb").option("spark.mongodb.read.collection", "Questions").load()

questionsDF = questionsRawDF \
    .withColumn("CreationDate", func.to_date(func.col("CreationDate"))) \
    .withColumn("ClosedDate", func.to_date(func.col("ClosedDate"))) \
    .withColumn("OwnerUserId", func.col("OwnerUserId").cast("int")) \
    .drop("_id")

#questionsDF.printSchema()

answersRawDF = spark.read.format("mongodb").option("spark.mongodb.read.collection", "Answers").load()

answersDF = answersRawDF \
    .withColumn("CreationDate", func.to_date(func.col("CreationDate"))) \
    .withColumn("OwnerUserId", func.col("OwnerUserId").cast("int")) \
    .drop("_id")

#answersDF.printSchema()

root
 |-- Body: string (nullable = true)
 |-- ClosedDate: date (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)

root
 |-- Body: string (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- Score: integer (nullable = true)

### 4. Finding the frequency of programming languages in questions data
- Count each time when a programming language was used in a question's body element.
- Each question's body element will be exploded and separated by a space ' '
- Final data store in:
    - totalLanguageDF

In [26]:
# Create a programming languages list to iterable through the data

languages = ['Java', 'Python', 'C\+\+', 'C#', 'Go', 'Ruby', 'Javascript', 'PHP', 'HTML', 'CSS', 'SQL']

# Split questions.Body into elements and use regular expression through each element to get the programming language name only. 

for i, name in enumerate(languages) :
    getName = questionsDF \
        .withColumn("explodeBody", func.explode(func.split(func.col("Body"), " "))) \
        .select(func.regexp_extract(func.col("explodeBody"), rf"({name})" , 1).alias("Programing Language")) \
        .filter(func.col("Programing Language") != "") \
        .groupBy("Programing Language") \
        .agg(func.count("*").alias("Count"))
    if i == 0:
        totalLanguageDF = getName
    else:
        totalLanguageDF = totalLanguageDF.unionByName(getName, allowMissingColumns=True)
        
totalLanguageDF.show()




+-------------------+-----+
|Programing Language|Count|
+-------------------+-----+
|               Java| 4475|
|             Python| 1651|
|                C++| 2233|
|                 C#| 2997|
|                 Go| 1946|
|               Ruby|  887|
|         Javascript|  497|
|                PHP| 2720|
|               HTML| 2746|
|                CSS|  903|
|                SQL| 6335|
+-------------------+-----+





+-------------------+-----+
|Programing Language|Count|
+-------------------+-----+
|               Java| 4475|
|             Python| 1651|
|                C++| 2233|
|                 C#| 2997|
|                 Go| 1946|
|               Ruby|  887|
|         Javascript|  497|
|                PHP| 2720|
|               HTML| 2746|
|                CSS|  903|
|                SQL| 6335|
+-------------------+-----+

### 5. Find all the domain and total number of times its appearance in questions data
- Count each time when a domain was used in a question's body element.
- Each question's body element will be exploded and separated by a space ' '
- Final data store in:
    - totalDomainDF

In [27]:
# Split questions.Body into elements and use regular expression through each element to get the domain name only. 

totalDomain = questionsDF \
        .withColumn("explodeBody", func.explode(func.split(func.col("Body"), " "))) \
        .select(func.regexp_extract(func.col("explodeBody"), r'https?://(\S+?/)' , 1).alias("Domain")) \
        .filter(func.col("Domain") != "") \
        .groupBy("Domain") \
        .agg(func.count("*").alias("Count")) \
        .sort(func.col("Count").desc())

totalDomain.show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+-----+
|              Domain|Count|
+--------------------+-----+
|  stackoverflow.com/| 1977|
|   en.wikipedia.org/|  760|
| msdn.microsoft.com/|  695|
|         www.w3.org/|  594|
|schemas.microsoft...|  262|
|    code.google.com/|  254|
|          localhost/|  236|
|        example.com/|  185|
|       java.sun.com/|  171|
|         github.com/|  133|
|     blogs.msdn.com/|  106|
|     www.google.com/|   97|
|  i.stack.imgur.com/|   91|
|        www.last.fm/|   79|
|schemas.xmlsoap.org/|   76|
|developer.apple.com/|   72|
|support.microsoft...|   66|
|www.codeproject.com/|   62|
|   www.codeplex.com/|   60|
|    www.example.com/|   60|
+--------------------+-----+
only showing top 20 rows



                                                                                

+--------------------+-----+
|              Domain|Count|
+--------------------+-----+
|  stackoverflow.com/| 1977|
|   en.wikipedia.org/|  760|
| msdn.microsoft.com/|  695|
|         www.w3.org/|  594|
|schemas.microsoft...|  262|
|    code.google.com/|  254|
|          localhost/|  236|
|        example.com/|  185|
|       java.sun.com/|  171|
|         github.com/|  133|
|     blogs.msdn.com/|  106|
|     www.google.com/|   97|
|  i.stack.imgur.com/|   91|
|        www.last.fm/|   79|
|schemas.xmlsoap.org/|   76|
|developer.apple.com/|   72|
|support.microsoft...|   66|
|www.codeproject.com/|   62|
|   www.codeplex.com/|   60|
|    www.example.com/|   60|
+--------------------+-----+

### 6. Calculate the total of user's score by each day 
- Select only needed data in "questions" dataframe.
- Apply grouping and aggregate functions.
- Final data store in:
    - totalScoreDF

In [10]:
# Select and group only needed data only

ScoreDF = questionsDF \
    .filter(questionsDF.OwnerUserId != 0) \
    .groupBy(questionsDF.OwnerUserId, questionsDF.CreationDate) \
    .agg(func.sum("Score").alias("ScoreInDay"))

# Create a window which running in the questions.CreationDate order (increment date),
# and divided by questions.OwnerUserId

running_total_window = Window.orderBy("CreationDate") \
        .partitionBy("OwnerUserId")

# Run the aggregation through the window

totalScoreDF = ScoreDF \
    .withColumn("RTotalScore", func.sum("ScoreInDay").over(running_total_window)) \
    .sort(func.col("OwnerUserId"), func.col("CreationDate"))

totalScoreDF.show()

+-----------+------------+----------+-----------+
|OwnerUserId|CreationDate|ScoreInDay|RTotalScore|
+-----------+------------+----------+-----------+
|          1|  2008-11-26|        10|         10|
|          1|  2009-01-08|        20|         30|
|          1|  2009-10-08|        28|         58|
|          4|  2009-01-01|         4|          4|
|          4|  2009-02-14|         9|         13|
|          5|  2008-12-28|         0|          0|
|          5|  2009-04-08|        12|         12|
|         17|  2008-08-05|        14|         14|
|         23|  2008-12-16|        27|         27|
|         25|  2008-08-23|        10|         10|
|         25|  2009-04-13|        71|         81|
|         26|  2008-08-01|        26|         26|
|         26|  2008-08-14|         1|         27|
|         26|  2008-08-15|         4|         31|
|         26|  2008-09-22|         3|         34|
|         26|  2009-01-02|        10|         44|
|         26|  2009-01-09|         1|         45|


+-----------+------------+----------+-----------+
|OwnerUserId|CreationDate|ScoreInDay|RTotalScore|
+-----------+------------+----------+-----------+
|          1|  2008-11-26|        10|         10|
|          1|  2009-01-08|        20|         30|
|          1|  2009-10-08|        28|         58|
|          4|  2009-01-01|         4|          4|
|          4|  2009-02-14|         9|         13|
|          5|  2008-12-28|         0|          0|
|          5|  2009-04-08|        12|         12|
|         17|  2008-08-05|        14|         14|
|         23|  2008-12-16|        27|         27|
|         25|  2008-08-23|        10|         10|
|         25|  2009-04-13|        71|         81|
|         26|  2008-08-01|        26|         26|
|         26|  2008-08-14|         1|         27|
|         26|  2008-08-15|         4|         31|
|         26|  2008-09-22|         3|         34|
|         26|  2009-01-02|        10|         44|
|         26|  2009-01-09|         1|         45|
|         26|  2009-02-15|         0|         45|
|         26|  2009-03-10|         7|         52|
|         26|  2009-04-03|         0|         52|
+-----------+------------+----------+-----------+
only showing top 20 rows


### 7. Calculate the total of user's score by given time 
- Select only needed data in "questions" dataframe.
- Apply grouping, windowing and aggregate functions.
- Final data store in:
    - totalScore2DF

In [30]:
# Import time library to format correct date type

from time import strptime, strftime

START = '01-01-2008'
END = '01-09-2009'

# Filter the dataframe with conditions

totalScore2DF = questionsDF \
    .filter(questionsDF.OwnerUserId != 0) \
    .filter(questionsDF.CreationDate >= strftime("%Y-%m-%d", strptime(START, "%d-%m-%Y"))) \
    .filter(questionsDF.CreationDate <= strftime("%Y-%m-%d", strptime(END, "%d-%m-%Y"))) \
    .groupBy(questionsDF.OwnerUserId) \
    .agg(func.sum("Score").alias("ScoreInTime")) \
    .sort(questionsDF.OwnerUserId)

totalScore2DF.show()

+-----------+-----------+
|OwnerUserId|ScoreInTime|
+-----------+-----------+
|          1|         30|
|          4|         13|
|          5|         12|
|         17|         14|
|         23|         27|
|         25|         81|
|         26|         52|
|         27|          9|
|         29|        234|
|         33|        222|
|         35|         25|
|         39|         16|
|         40|          7|
|         41|         16|
|         43|          1|
|         45|         24|
|         49|         22|
|         51|         30|
|         55|          2|
|         56|         41|
+-----------+-----------+
only showing top 20 rows



+-----------+-----------+
|OwnerUserId|ScoreInTime|
+-----------+-----------+
|          1|         30|
|          4|         13|
|          5|         12|
|         17|         14|
|         23|         27|
|         25|         81|
|         26|         52|
|         27|          9|
|         29|        234|
|         33|        222|
|         35|         25|
|         39|         16|
|         40|          7|
|         41|         16|
|         43|          1|
|         45|         24|
|         49|         22|
|         51|         30|
|         55|          2|
|         56|         41|
+-----------+-----------+
only showing top 20 rows



### 8. Find the total number of good questions
- Good questions is known to have more than 5 answers.
- In answers data, "ParentId" is the question's id. 
- Final data store in:
    - goodQuestionDF

In [31]:
# Create only needed dataframe to reduce work load when join

questionsidDF = questionsDF \
    .select("Id", "OwnerUserID", "Title")

answersidDF = answersDF \
    .select("ParentId", func.col("Id").alias("CommentId"))

# Setting join parameter: key and join type

joinExpr = questionsidDF.Id == answersidDF.ParentId
joinType = "inner"

# Join and filter through conditions to get the results

goodQuestionDF = questionsidDF \
    .join(answersidDF, joinExpr, joinType) \
    .drop(answersidDF.ParentId) \
    .groupBy("Id", "OwnerUserID", "Title") \
    .agg(func.count("*").alias("TotalAnswers")) \
    .filter(func.col("TotalAnswers") >= 5) \
    .sort(func.col("TotalAnswers").desc())

goodQuestionDF.show()

[Stage 13:>                                                         (0 + 1) / 1]

+------+-----------+--------------------+------------+
|    Id|OwnerUserID|               Title|TotalAnswers|
+------+-----------+--------------------+------------+
|406760|      22656|What's your most ...|         295|
| 38210|       1944|What non-programm...|         176|
| 23930|       1337|Factorial Algorit...|         112|
|490420|       9931|Favorite (Clever)...|          64|
|100420|       9611|Hidden Features o...|          52|
|106340|      17176|What is your favo...|          52|
|226970|      28722|What's the  best ...|          45|
|274230|      31649|What are the best...|          42|
| 24270|       2131|What's the point ...|          41|
|202750|      68336|Is there a human ...|          40|
|182600|       1533|Should one use < ...|          37|
| 84340|      11827|Why learn Perl, P...|          34|
|258740|        672|What tools are bu...|          32|
|485120|      40516|Will Emacs make m...|          32|
| 23250|       2016|When do you use t...|          30|
| 23490|  

                                                                                

+------+-----------+--------------------+------------+
|    Id|OwnerUserID|               Title|TotalAnswers|
+------+-----------+--------------------+------------+
|406760|      22656|What's your most ...|         295|
| 38210|       1944|What non-programm...|         176|
| 23930|       1337|Factorial Algorit...|         112|
|490420|       9931|Favorite (Clever)...|          64|
|100420|       9611|Hidden Features o...|          52|
|106340|      17176|What is your favo...|          52|
|226970|      28722|What's the  best ...|          45|
|274230|      31649|What are the best...|          42|
| 24270|       2131|What's the point ...|          41|
|202750|      68336|Is there a human ...|          40|
|182600|       1533|Should one use < ...|          37|
| 84340|      11827|Why learn Perl, P...|          34|
|258740|        672|What tools are bu...|          32|
|485120|      40516|Will Emacs make m...|          32|
| 23250|       2016|When do you use t...|          30|
| 23490|       2213|What is your best...|          29|
|101070|      12293|What is an ideal ...|          28|
|424580|      25815|When foo and bar ...|          28|
|547000|      21974|Why would you cho...|          27|
|711140|      81036|Why isn't Smallta...|          27|
+------+-----------+--------------------+------------+

### 9. Find the active users
- Active users is known to qualify one of the following requirments:

    - Having the total number of answers more than 50 or the total score greater than 500. 
    - Having more than 5 answer in the same day the question created.
    
- Finding two separate cases and join.
- Final data store in:
    - activeUsersDF

In [32]:
# Solve the first case

# Filter the dataframe through conditions to get the results

firstCaseDF = answersDF \
    .select(func.col("OwnerUserId").alias("Users"), "Score") \
    .groupBy("Users") \
    .agg(func.count("*").alias("Total comments"), func.sum("Score").alias("Total scores")) \
    .where((func.col("Total comments") > 50) | (func.col("Total scores") > 500)) \
    .select("Users")

# Solve the second case

# Create only needed dataframe to reduce work load when join

questionsdayDF = questionsDF \
    .select("Id", "CreationDate")

answersdayDF = answersDF \
    .select("ParentId", func.col("Id").alias("CommentId"), func.col("CreationDate").alias("CommentDate"))

# Setting join parameter: key and join type

joinExpr2 = (questionsdayDF.Id == answersdayDF.ParentId) & (questionsdayDF.CreationDate == answersdayDF.CommentDate)

# Join and filter the dataframe through conditions to get the results

secondCaseDF = questionsdayDF \
    .join(answersdayDF, joinExpr2, "inner") \
    .drop(answersdayDF.ParentId) \
    .drop(questionsdayDF.CreationDate) \
    .drop(answersdayDF.CommentDate) \
    .groupBy("CommentID") \
    .agg(func.count("*").alias("QuickAnswers")) \
    .filter(func.col("QuickAnswers") > 5) \
    .select("CommentID")

# Union two cases results to get the final result

activeUsersDF = firstCaseDF.union(secondCaseDF).show()

+-----+
|Users|
+-----+
|  157|
|  312|
|  357|
|  267|
|  615|
|  905|
|  184|
| 1199|
|   77|
|  893|
| 1585|
| 1659|
| 1695|
|  142|
| 1412|
| 1965|
|  714|
| 2199|
| 1968|
|  459|
+-----+
only showing top 20 rows



+-----+
|Users|
+-----+
|  157|
|  312|
|  357|
|  267|
|  615|
|  905|
|  184|
| 1199|
|   77|
|  893|
| 1585|
| 1659|
| 1695|
|  142|
| 1412|
| 1965|
|  714|
| 2199|
| 1968|
|  459|
+-----+
only showing top 20 rows

