# ASM1 - Phân tích hành vi và thói quen của người dùng Stack Overflow


### Import thư viện cần thiết và tạo SparkSession


In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as f
import findspark
from datetime import datetime

findspark.init()
myConf = SparkConf()    \
    .setMaster("local") \
    .setAppName("ASM1")

spark = SparkSession \
    .builder    \
    .config(conf=myConf)    \
    .config('spark.jars.packages',"org.mongodb.spark:mongo-spark-connector_2.12:10.1.1")    \
    .config("spark.mongodb.read.connection.uri","mongodb://127.0.0.1:27017/")    \
    .config("spark.mongodb.read.database","ASM1-DEP303")    \
    .getOrCreate()

### Đọc dữ liệu từ mongodb và  chuẩn hóa dữ liệu

In [3]:
# Tạo dataframe bằng read mongodb collection và thay đổi kiểu dữ liệu cho phù hợp
Questions_df = spark.read \
    .format('mongodb') \
    .option('spark.mongodb.read.collection','Questions') \
    .load() 

Questions_df = Questions_df \
    .withColumn("CreationDate", f.to_date("CreationDate")) \
    .withColumn("ClosedDate", f.to_date("ClosedDate")) \
    .withColumn("OwnerUserId", f.col("OwnerUserId").cast("int")) \
    .drop("_id")

Questions_df.printSchema()

Answers_df = spark.read \
    .format('mongodb') \
    .option('spark.mongodb.read.collection','Answers') \
    .load()
Answers_df = Answers_df \
    .withColumn("CreationDate",to_date("CreationDate")) \
    .withColumn("OwnerUserId",f.col("OwnerUserId").cast("int")) \
    .drop("_id")

Answers_df.printSchema()


root
 |-- Body: string (nullable = true)
 |-- ClosedDate: date (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Title: string (nullable = true)

root
 |-- Body: string (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Id: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- ParentId: integer (nullable = true)
 |-- Score: integer (nullable = true)



### Yêu cầu 1: Tính số lần xuất hiện của các ngôn ngữ lập trình

In [4]:
# Tạo list ngôn ngữ cần đếm

languages = ['Java', 'Python', 'C\+\+', 'C#', 'Go', 'Ruby', 'Javascript', 'PHP', 'HTML', 'CSS', 'SQL']

# Split Questions.Body theo khoảng trắng và dùng regex để lấy tên ngôn ngữ cần đếm
for i, name in enumerate(languages) :
    getName = Questions_df \
        .withColumn("explodeBody", f.explode(f.split(f.col("Body"), " "))) \
        .select(f.regexp_extract(f.col("explodeBody"), rf"({name})" , 1).alias("Programing Language")) \
        .filter(f.col("Programing Language") != "") \
        .groupBy("Programing Language") \
        .agg(f.count("*").alias("Count"))
    if i == 0:
        totalLanguageDF = getName
    else:
        totalLanguageDF = totalLanguageDF.unionByName(getName, allowMissingColumns=True)
        
totalLanguageDF.show()

+-------------------+------+
|Programing Language| Count|
+-------------------+------+
|               Java|103531|
|             Python| 43625|
|                C++| 28828|
|                 C#| 32396|
|                 Go| 78057|
|               Ruby| 16006|
|         Javascript| 14799|
|                PHP| 62855|
|               HTML| 88923|
|                CSS| 33388|
|                SQL|140739|
+-------------------+------+



### Yêu cầu 2 : Tìm các domain được sử dụng nhiều nhất trong các câu hỏi

In [5]:
# Split Questions.Body theo khoảng trắng, dùng regex để lấy tên domain và dùng count để đếm tên domain
totalDomain = Questions_df \
        .withColumn("explodeBody", f.explode(f.split(f.col("Body"), " "))) \
        .select(f.regexp_extract(f.col("explodeBody"), r'https?://(\S+?/)' , 1).alias("Domain")) \
        .filter(f.col("Domain") != "") \
        .groupBy("Domain") \
        .agg(f.count("*").alias("Count")) \
        .sort(f.col("Count").desc())

totalDomain.show()

+--------------------+------+
|              Domain| Count|
+--------------------+------+
|  i.stack.imgur.com/|125351|
|  stackoverflow.com/| 56836|
|       jsfiddle.net/| 34808|
|         github.com/| 34728|
|         www.w3.org/| 17716|
|schemas.android.com/| 14465|
|www.springframewo...| 11636|
| msdn.microsoft.com/|  8909|
|          localhost/|  6383|
|   en.wikipedia.org/|  5452|
|       pastebin.com/|  5431|
|schemas.microsoft...|  5414|
|       java.sun.com/|  5324|
|ajax.googleapis.com/|  5248|
|        example.com/|  4399|
|    code.google.com/|  4274|
|        i.imgur.com/|  4134|
|developers.google...|  3798|
|developer.android...|  3662|
|     localhost:8080/|  3533|
+--------------------+------+
only showing top 20 rows



### Yêu cầu 3 : Tính tổng điểm của User theo từng ngày

In [6]:
# Tính tổng điểm của từng Id theo từng ngày
score_df = Questions_df    \
    .filter(Questions_df.OwnerUserId != 0)    \
    .groupBy("OwnerUserId","CreationDate")    \
    .agg(f.sum("Score").alias("TotalScoreInDay"))
   
# Tạo Windowing để tính tổng điểm của từng Id qua các ngày
running_total_window = Window.partitionBy("OwnerUserId")    \
    .orderBy("CreationDate")

# Chạy câu truy vấn theo windowing
totalscore_df = score_df.withColumn("TotalScore",f.sum("TotalScoreInDay").over(running_total_window))    \
    .sort("OwnerUserId","CreationDate")

totalscore_df.show()

+-----------+------------+---------------+----------+
|OwnerUserId|CreationDate|TotalScoreInDay|TotalScore|
+-----------+------------+---------------+----------+
|          1|  2008-11-26|             10|        10|
|          1|  2009-01-08|             20|        30|
|          1|  2009-10-08|             28|        58|
|          4|  2009-01-01|              4|         4|
|          4|  2009-02-14|              9|        13|
|          4|  2010-07-02|             66|        79|
|          5|  2008-12-28|              0|         0|
|          5|  2009-04-08|             12|        12|
|          5|  2011-03-28|             11|        23|
|          5|  2011-04-06|              2|        25|
|          9|  2012-01-19|              2|         2|
|         17|  2008-08-05|             14|        14|
|         17|  2010-09-05|              1|        15|
|         17|  2011-01-27|              0|        15|
|         20|  2010-09-22|              2|         2|
|         20|  2011-04-21|  

### Yêu cầu 4: Tính tổng số điểm mà User đạt được trong một khoảng thời gian

In [7]:
START = '01-01-2008'
END = '01-01-2009'

# Filter dataframe theo điều kiện
totalscore2_df = Questions_df.filter(f.col("CreationDate") > datetime.strptime(START, '%d-%m-%Y'))    \
    .filter(f.col("CreationDate") < datetime.strptime(END, '%d-%m-%Y'))    \
    .filter(Questions_df.OwnerUserId != 0)    \
    .groupBy("OwnerUserId") \
    .agg(f.sum("Score").alias("TotalScore"))    \
    .sort("OwnerUserId")
totalscore2_df.show()

+-----------+----------+
|OwnerUserId|TotalScore|
+-----------+----------+
|          1|        10|
|          5|         0|
|         17|        14|
|         23|        27|
|         25|        10|
|         26|        34|
|         27|         9|
|         29|       206|
|         33|       222|
|         35|        25|
|         39|        16|
|         40|         7|
|         41|        16|
|         45|        12|
|         49|        22|
|         51|        30|
|         56|        28|
|         58|       171|
|         60|        22|
|         61|        63|
+-----------+----------+
only showing top 20 rows



### Yêu cầu 5: Tìm các câu hỏi có nhiều câu trả lời

In [8]:
# Tạo dataframe gồm các data cần thiết
questionsid_df = Questions_df.select("Id","OwnerUserId","Title")
answersid_df = Answers_df.select("ParentId")

# Tạo key và kiểu join
joinexpr = questionsid_df.Id == answersid_df.ParentId
join_type = "inner"

goodQuestions_df = questionsid_df.join(answersid_df,joinexpr,join_type)    \
    .groupBy("Id","OwnerUserId","Title")    \
    .agg(f.count("*").alias("TotalAnswers"))    \
    .filter(f.col("TotalAnswers") >= 5)    \
    .sort(f.col("TotalAnswers").desc())
goodQuestions_df.show()

+--------+-----------+--------------------+------------+
|      Id|OwnerUserId|               Title|TotalAnswers|
+--------+-----------+--------------------+------------+
|  406760|      22656|What's your most ...|         408|
|   38210|       1944|What non-programm...|         316|
|   23930|       1337|Factorial Algorit...|         129|
|  100420|       9611|Hidden Features o...|         100|
|   40480|       4315|Is Java "pass-by-...|          69|
|  490420|       9931|Favorite (Clever)...|          67|
|  106340|      17176|What is your favo...|          61|
| 2155930|      65393|Fixing "The break...|          59|
|  226970|      28722|What's the  best ...|          55|
|  202750|      68336|Is there a human ...|          51|
| 1218390|     106140|What is your most...|          50|
|17054000|    1489990|"cannot resolve s...|          49|
|   24270|       2131|What's the point ...|          45|
|  173400|       5291|How to check if P...|          43|
|  274230|      31649|What are 

### Yêu cầu 6: Tìm các Active User

In [9]:
# Gồm 2 điều kiện
# 1. Có nhiều hơn 50 câu trả lời hoặc tổng số điểm đạt được khi trả lời lớn hơn 500
firstCase_df = Answers_df    \
    .select(f.col("OwnerUserId").alias("User"),"Score")    \
    .groupBy("User")    \
    .agg(f.count("*").alias("TotalAnswers"),f.sum("Score").alias("TotalScore"))    \
    .where((f.col("TotalAnswers") > 50) | (f.col("TotalScore") > 500))    \
    .select("User")

# 2.Có nhiều hơn 5 câu trả lời ngay trong ngày câu hỏi được tạo
questionsdayDF = Questions_df \
    .select("Id", "CreationDate")
answersdayDF = Answers_df \
    .select("ParentId","Id","CreationDate")
joinexpr2 = questionsdayDF.Id == answersdayDF.ParentId
secondCase_df = questionsdayDF    \
    .join(answersdayDF,joinexpr2,"inner")    \
    .where((questionsdayDF.CreationDate) == (answersdayDF.CreationDate))    \
    .groupBy(questionsdayDF.Id)    \
    .agg(f.count("*").alias("QuickAnswers"))    \
    .filter(f.col("QuickAnswers") > 5)    \
    .select("Id")
# Union 2 case 
activeUsersDF = firstCase_df.union(secondCase_df).show()

+------+
|  User|
+------+
| 17389|
| 74757|
| 91299|
|175201|
|230513|
| 14148|
| 17172|
| 42304|
| 49197|
|201725|
|207421|
|205426|
| 17028|
| 39742|
| 97385|
| 98654|
|109035|
|127440|
|147463|
|139459|
+------+
only showing top 20 rows

