# Spark Context

In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("spark-sql")
         .getOrCreate())
sc = spark.sparkContext

24/06/20 09:08:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/20 09:08:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/home/jovyan/data/movies.csv")
)

In [3]:
df.summary()

DataFrame[summary: string, movieId: string, title: string, genres: string]

In [4]:
df.show(10, False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck (1995)               |Adventure|Children                         |
|9      |Sudden Death

In [5]:
df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [6]:
df.createOrReplaceTempView("movies")

In [7]:
n_df = spark.sql("SELECT DISTINCT(genres) FROM movies")
n_df.show(30, False)

+-------------------------------------------------+
|genres                                           |
+-------------------------------------------------+
|Comedy|Horror|Thriller                           |
|Adventure|Sci-Fi|Thriller                        |
|Action|Adventure|Drama|Fantasy                   |
|Action|Drama|Horror                              |
|Action|Animation|Comedy|Sci-Fi                   |
|Animation|Children|Drama|Musical|Romance         |
|Action|Adventure|Drama                           |
|Adventure|Sci-Fi                                 |
|Documentary|Musical|IMAX                         |
|Adventure|Children|Fantasy|Sci-Fi|Thriller       |
|Adventure|Animation                              |
|Musical|Romance|War                              |
|Action|Adventure|Fantasy|Romance                 |
|Adventure|Children|Drama|Fantasy|IMAX            |
|Comedy|Crime|Horror|Thriller                     |
|Crime|Drama|Fantasy|Horror|Thriller              |
|Comedy|Myst

In [8]:
spark.sql("SELECT title FROM movies WHERE movieId > 100").show(10, False)

+------------------------------------------+
|title                                     |
+------------------------------------------+
|Bottle Rocket (1996)                      |
|Mr. Wrong (1996)                          |
|Unforgettable (1996)                      |
|Happy Gilmore (1996)                      |
|Bridges of Madison County, The (1995)     |
|Nobody Loves Me (Keiner liebt mich) (1994)|
|Muppet Treasure Island (1996)             |
|Catwalk (1996)                            |
|Braveheart (1995)                         |
|Taxi Driver (1976)                        |
+------------------------------------------+
only showing top 10 rows



In [9]:
spark.sql("""
    SELECT movieId, genres
    FROM movies
    GROUP BY genres, movieId"""
    ).where("movieId > 10"
    ).count()

                                                                                

9732

## RDD with partitions

In [12]:
rdd1 = sc.textFile("./data/movies.csv")
rdd2 = sc.textFile("./data/ratings.csv")

In [13]:

print(f"defaultMinPartitions: {sc.defaultMinPartitions}")
print(f"name: rdd1, count: {rdd1.count}, partitions: {rdd1.getNumPartitions()}")
print(f"name: rdd2, count: {rdd2.count}, partitions: {rdd2.getNumPartitions()}")


defaultMinPartitions: 1
name: rdd1, count: <bound method RDD.count of ./data/movies.csv MapPartitionsRDD[46] at textFile at NativeMethodAccessorImpl.java:0>, partitions: 1
name: rdd2, count: <bound method RDD.count of ./data/ratings.csv MapPartitionsRDD[48] at textFile at NativeMethodAccessorImpl.java:0>, partitions: 1


## Spark SQL

### DE 연봉

In [14]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
import pyspark.sql.functions as F

# JSON 데이터의 스키마 정의
salaries_schema = StructType([
    StructField('company_location', StringType(), True),
    StructField('company_size', StringType(), True),
    StructField('employee_residence', StringType(), True),
    StructField('employment_type', StringType(), True),
    StructField('experience_level', StringType(), True),
    StructField('job_title', StringType(), True),
    StructField('remote_ratio', IntegerType(), True),
    StructField('salary', StringType(), True),  # Initially as StringType
    StructField('salary_currency', StringType(), True),
    StructField('salary_in_usd', StringType(), True),  # Initially as StringType
    StructField('work_year', StringType(), True)
])

# JSON 파일 읽기
players_df = spark.read.schema(player_schema).json('path/to/your/players_data.json')

# DataFrame의 상위 10개 행을 표시
players_df.show(10, False)

+----------------+------------+------------------+---------------+----------------+------------------------------+------------+------+---------------+-------------+---------+
|company_location|company_size|employee_residence|employment_type|experience_level|job_title                     |remote_ratio|salary|salary_currency|salary_in_usd|work_year|
+----------------+------------+------------------+---------------+----------------+------------------------------+------------+------+---------------+-------------+---------+
|US              |M           |US                |FT             |EX              |Data Science Director         |0           |212000|USD            |212000       |2023     |
|US              |M           |US                |FT             |EX              |Data Science Director         |0           |190000|USD            |190000       |2023     |
|GB              |M           |GB                |FT             |MI              |Business Intelligence Engineer|0          

In [15]:
# 스키마 자동 추론
raw_df = spark.read.json('./data/salaries.json')

# 상위 10개 행을 출력
raw_df.printSchema()
raw_df.show(10, False)

root
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- remote_ratio: long (nullable = true)
 |-- salary: string (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: string (nullable = true)
 |-- work_year: string (nullable = true)

+----------------+------------+------------------+---------------+----------------+------------------------------+------------+------+---------------+-------------+---------+
|company_location|company_size|employee_residence|employment_type|experience_level|job_title                     |remote_ratio|salary|salary_currency|salary_in_usd|work_year|
+----------------+------------+------------------+---------------+----------------+------------------------------+------------+------+---------------+---

### 쇼핑몰 리뷰

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pyspark.sql.functions as F

# 고객 데이터 스키마 정의
customers_schema = StructType([
    StructField('Id', IntegerType(), True),
    StructField('NickName', StringType(), True)
])

# 고객 데이터 로드
customers_df = spark.read.schema(customers_schema).json('./data/customers.json')

# 데이터프레임 확인
customers_df.show(10, False)

+------+----------------+
|Id    |NickName        |
+------+----------------+
|103603|1000kgthanh     |
|103760|999999999ok     |
|103829|ac7ive          |
|1     |admin           |
|103839|ahkk.nguyen     |
|103981|akyshin         |
|103863|alex.tran.7792  |
|103694|ali33va40tencuop|
|103547|alice.mi.39     |
|103523|all4u4me        |
+------+----------------+
only showing top 10 rows



In [4]:
# 제품 데이터 스키마 정의
products_schema = StructType([
    StructField('Id', IntegerType(), True),
    StructField('Name', StringType(), True),
    StructField('UnitPrice', StringType(), True)
])

# 제품 데이터 로드
products_df = spark.read.schema(products_schema).json('./data/products.json')

# 데이터프레임 확인
products_df.show(10, False)

+---+----------------------------------------------+---------+
|Id |Name                                          |UnitPrice|
+---+----------------------------------------------+---------+
|1  |Build your own computer                       |1200     |
|2  |Digital Storm VANQUISH 3 Custom Performance PC|1259     |
|3  |Lenovo IdeaCentre 600 All-in-One PC           |500      |
|4  |Apple MacBook Pro 13-inch                     |1800     |
|5  |Asus N551JK-XO076H Laptop                     |1500     |
|6  |Samsung Series 9 NP900X4C Premium Ultrabook   |1590     |
|7  |HP Spectre XT Pro UltraBook                   |1350     |
|8  |HP Envy 6-1180ca 15.6-Inch Sleekbook          |1460     |
|9  |Lenovo Thinkpad X1 Carbon Laptop              |1360     |
|10 |Adobe Photoshop CS4                           |75       |
+---+----------------------------------------------+---------+
only showing top 10 rows



In [7]:
# 평가 데이터 스키마 정의
ratings_schema = StructType([
    StructField('CustomerID', IntegerType(), True),
    StructField('ProductID', IntegerType(), True),
    StructField('Rate', IntegerType(), True),
    StructField('CreateDate', StringType(), True)
])

# 평가 데이터 로드
ratings_df = spark.read.schema(ratings_schema).json('./data/new_ratings.json')  # 올바른 파일 경로로 수정 필요

# 데이터프레임 확인
ratings_df.show(10, False)

+----------+---------+----+-------------------+
|CustomerID|ProductID|Rate|CreateDate         |
+----------+---------+----+-------------------+
|103416    |619      |1   |2018/01/01 01:36:30|
|103654    |411      |1   |2018/01/01 01:36:35|
|103954    |298      |3   |2018/01/01 01:36:38|
|103672    |361      |5   |2018/01/01 01:37:15|
|103960    |536      |5   |2018/01/01 02:36:25|
|103372    |481      |2   |2018/01/01 02:36:32|
|103444    |132      |1   |2018/01/01 02:36:34|
|103831    |41       |1   |2018/01/01 02:36:41|
|103541    |498      |5   |2018/01/01 02:36:50|
|103819    |155      |4   |2018/01/01 02:37:10|
+----------+---------+----+-------------------+
only showing top 10 rows



In [22]:
# 고객 데이터와 평가 데이터 병합
merged_df = ratings_df.join(customers_df, ratings_df.CustomerID == customers_df.Id, 'inner') \
                      .select(ratings_df['*'], customers_df['NickName'])

# 병합된 데이터와 제품 데이터 병합
final_df = merged_df.join(products_df, merged_df.ProductID == products_df.Id, 'inner') \
                    .select(merged_df['*'], products_df['Name'], products_df['UnitPrice'])

# 최종 데이터프레임 확인
final_df.show(10, False)

+----------+---------+----+-------------------+----------------+---------------------------------------------------------------------------------------------------------------------------+---------+
|CustomerID|ProductID|Rate|CreateDate         |NickName        |Name                                                                                                                       |UnitPrice|
+----------+---------+----+-------------------+----------------+---------------------------------------------------------------------------------------------------------------------------+---------+
|103416    |619      |1   |2018/01/01 01:36:30|vanduong0403    |Le Vian Chocolate Diamonds 1/4 ct tw Earrings 14K Honey Gold                                                               |1049     |
|103654    |411      |1   |2018/01/01 01:36:35|thutruc.huynh.3 |Diamond Heart Necklace 1/10 ct tw Round-Cut 10K Yellow Gold 18"                                                            |299.99   |
|1039

In [11]:
# 각 고객이 평가한 제품의 평균 평점 계산
average_ratings = final_df.groupBy('NickName').agg(F.avg('Rate').alias('AverageRating'))

print('===========각 고객별 평가한 제품의 평균 평점===========')

# 결과 확인
average_ratings.show(10, False)

+------------------+------------------+
|NickName          |AverageRating     |
+------------------+------------------+
|chausoco          |4.0               |
|Kojumi            |5.0               |
|trannhatvy        |5.0               |
|ruud0407          |2.5081521739130435|
|lenam.uit         |4.006306937631394 |
|thanh.cule        |5.0               |
|nguyettram        |3.375912408759124 |
|NguyenVuNhatChuong|4.0               |
|ptkiuee           |4.0               |
|tuonglam.dang     |3.6470588235294117|
+------------------+------------------+
only showing top 10 rows



### 수업 예시 코드

In [71]:
from pyspark.sql.types import StructType, StructField, StringType, LongType #사용할 칼럼을 잡아서 설정
import pyspark.sql.functions as F

actor_schema = StructType([
    StructField('login', StringType(), True),
    StructField('url', StringType(), True)
])

payload_schema = StructType([
    StructField('repository_id', LongType(), True),
    StructField('size', LongType(), True),
    StructField('distinct_size', LongType(), True),
    StructField('message', StringType(), True)
])

repo_schema = StructType([
    StructField('name', StringType(), True),
    StructField('url', StringType(), True)
])

new_df = github.withColumn('actor_json', F.from_json('actor', actor_schema)) \
               .select('created_at', 'id', 'payload', 'type', 'actor_json.*', 'repo')
new_df = new_df.withColumn('payload_json', F.from_json('payload', payload_schema)) \
               .select('login', 'url', 'created_at', 'id', 'payload_json.*', 'type', 'repo')
new_df = new_df.withColumn('repo_json', F.from_json('repo', repo_schema)) \
               .select('login', 'url', 'created_at', 'id', 'repository_id', 'size', 'distinct_size', 'message', 'type', 'repo_json.*')
new_df.show(10, False)

+-------------------+------------------------------------------------+--------------------+-----------+-------------+----+-------------+-------+-----------+----------------------------------------+---------------------------------------------------------------------+
|login              |url                                             |created_at          |id         |repository_id|size|distinct_size|message|type       |name                                    |url                                                                  |
+-------------------+------------------------------------------------+--------------------+-----------+-------------+----+-------------+-------+-----------+----------------------------------------+---------------------------------------------------------------------+
|Drunkula           |https://api.github.com/users/Drunkula           |2024-05-19T14:00:00Z|38509489462|489717552    |1   |1            |null   |PushEvent  |Drunkula/twitchtoolsglitch              


|login     |url    |created_at    |id   |repository_id|size|distinct_size|message|type  |
| --- | ---| ---| --- | --- | --- | --- | --- | --- |
| String | String | DateTime | Long | Long | Int | Int | String | String |

Top 50 Push repositories
Top 50 Commit repositories



### 샐러리 & 쇼핑 데이터 활용

In [21]:
final_df.printSchema()
print("=========================================================")
salaries_df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- Rate: integer (nullable = true)
 |-- CreateDate: string (nullable = true)
 |-- NickName: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- UnitPrice: string (nullable = true)

root
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- salary: long (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: long (nullable = true)
 |-- work_year: string (nullable = true)



In [22]:
final_df.show(10,False)
print("=========================================================")
salaries_df.show(10,False)

+----------+---------+----+-------------------+----------------+---------------------------------------------------------------------------------------------------------------------------+---------+
|CustomerID|ProductID|Rate|CreateDate         |NickName        |Name                                                                                                                       |UnitPrice|
+----------+---------+----+-------------------+----------------+---------------------------------------------------------------------------------------------------------------------------+---------+
|103416    |619      |1   |2018/01/01 01:36:30|vanduong0403    |Le Vian Chocolate Diamonds 1/4 ct tw Earrings 14K Honey Gold                                                               |1049     |
|103654    |411      |1   |2018/01/01 01:36:35|thutruc.huynh.3 |Diamond Heart Necklace 1/10 ct tw Round-Cut 10K Yellow Gold 18"                                                            |299.99   |
|1039

### 수업 예시 데이터

In [72]:
# filter github action bot
new_df = new_df.filter(col("login") != "github-actions[bot]")
new_df = new_df.withColumn('created_at', F.trim(F.regexp_replace(new_df.created_at, "[TZ]", " ")))
new_df = new_df.withColumn('created_dt', F.to_timestamp(new_df.created_at, 'yyyy-MM-dd HH:mm:ss'))
new_df.show(10, False)

+------------+-----------------------------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+----------------------------------------+---------------------------------------------------------------------+-------------------+
|login       |url                                      |created_at         |id         |repository_id|size|distinct_size|message|type       |name                                    |url                                                                  |created_dt         |
+------------+-----------------------------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+----------------------------------------+---------------------------------------------------------------------+-------------------+
|Drunkula    |https://api.github.com/users/Drunkula    |2024-05-19 14:00:00|38509489462|489717552    |1   |1            |null   |PushEvent  |Drunkula/twitchtoolsglitch              

In [73]:
new_df.printSchema()

root
 |-- login: string (nullable = true)
 |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- repository_id: long (nullable = true)
 |-- size: long (nullable = true)
 |-- distinct_size: long (nullable = true)
 |-- message: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- url: string (nullable = true)
 |-- created_dt: timestamp (nullable = true)



In [64]:
def check_repo_name(name):
    sp = name.split("/")
    if not sp:
        return name
    else:
        return sp[-1]
    
udf_check_repo_name = F.udf(check_repo_name, StringType()) 

In [None]:
# @F.udf(returnType=StringType())
# def check_repo_name(val):
#     sp = name.split("/")
#     if not sp:
#         return name
#     else:
#         return sp[-1]

In [None]:
# F.udf(lambda name => name.split("/")[-1], StringType()) 펑션 만드는 방법 2개 가져옴.

In [83]:
new_df = new_df.withColumn('repo_name', udf_check_repo_name(F.col('name')))
new_df.show(10, False)

+------------+-----------------------------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+----------------------------------------+---------------------------------------------------------------------+-------------------+---------------------------------+
|login       |url                                      |created_at         |id         |repository_id|size|distinct_size|message|type       |name                                    |url                                                                  |created_dt         |repo_name                        |
+------------+-----------------------------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+----------------------------------------+---------------------------------------------------------------------+-------------------+---------------------------------+
|Drunkula    |https://api.github.com/users/Drunkula    |2024-05-19 14:00:00|385

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [85]:
new_df.count()

                                                                                

171662

In [29]:
final_df.count()

130754

In [30]:
salaries_df.count()

8805

In [84]:
new_df.agg(F.countDistinct('repo_name')).show(5, False)



+----------------+
|count(repo_name)|
+----------------+
|50439           |
+----------------+




                                                                                

In [31]:
final_df.agg(F.countDistinct('NickName')).show(5, False)

+---------------+
|count(NickName)|
+---------------+
|344            |
+---------------+



In [32]:
salaries_df.agg(F.countDistinct('company_location')).show(5, False)

+-----------------------+
|count(company_location)|
+-----------------------+
|74                     |
+-----------------------+



In [88]:
# Window
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

w = Window.partitionBy("repo_name").orderBy(F.desc(col("size")))
new_df.withColumn("row", row_number().over(w)) \
      .filter(col("row") == 1).drop("row") \
      .count()

                                                                                

50439

## Data Model

1. Top 10 Repo
- id
- @timestamp
- repo_url
- repo_name
- push_count
- commit_count
- pr_count
- fork_count
- issue_count
- watch_count

2. Top 10 User
- id
- @timestamp
- user_name
- push_count
- commit_count
- pr_count
- issue_count
- issue_comment_count

3. Daily Stats
- id
- @timestamp
- distinct_user_cnt
- distinct_repo_cnt
- push_count
- commit_count
- pr_count
- issue_count
- issue_comment_count
- release_count

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

salaries_df.groupBy("job_title").agg(avg("salary_in_usd").alias("average_salary")).show()


+--------------------+------------------+
|           job_title|    average_salary|
+--------------------+------------------+
|Business Intellig...|       126253.1875|
|  Lead Data Engineer|139230.33333333334|
|        AI Architect|          250328.0|
|        Data Modeler|          128400.2|
|Data Visualizatio...|          122275.0|
| Data Scientist Lead|          136153.0|
|  Decision Scientist|166094.63157894736|
|Principal Data Ar...|           38154.0|
|Head of Machine L...|          198103.0|
|Machine Learning ...|188440.26666666666|
|Data Analytics Sp...|           95000.0|
|     Data Specialist|          103102.5|
|  Sales Data Analyst|           60000.0|
|Data Operations E...|         133431.25|
|Data Science Prac...|          144480.0|
|Data Operations M...|          136000.0|
| Data Analytics Lead|         162533.75|
|  Power BI Developer|           64781.0|
|Deep Learning Res...|          124163.0|
|Consultant Data E...|          118539.0|
+--------------------+------------

In [38]:

salaries_df.groupBy("experience_level").agg(avg("salary_in_usd").alias("average_salary")).show()

+----------------+------------------+
|experience_level|    average_salary|
+----------------+------------------+
|              EX|189052.61710037175|
|              MI| 114681.0213625866|
|              EN| 87676.76282051283|
|              SE|161889.01057449495|
+----------------+------------------+



In [37]:
salaries_df.groupBy("company_location").agg(avg("salary_in_usd").alias("average_salary")).show()

+----------------+------------------+
|company_location|    average_salary|
+----------------+------------------+
|              LT|           97611.0|
|              DZ|          100000.0|
|              FI| 68519.33333333333|
|              UA|121333.33333333333|
|              RO|50950.666666666664|
|              NL|           77956.2|
|              BS|           45555.0|
|              PL|          53923.75|
|              AM|           50000.0|
|              MX| 94864.63636363637|
|              EE|           46706.1|
|              CN|          100000.0|
|              AT| 71354.83333333333|
|              RU| 78207.85714285714|
|              IQ|          100000.0|
|              AD|           50745.0|
|              HR|           76726.0|
|              CZ| 69478.66666666667|
|              PT|49787.083333333336|
|              GH|           27000.0|
+----------------+------------------+
only showing top 20 rows



In [39]:
salaries_df.groupBy("remote_ratio").agg(avg("salary_in_usd").alias("average_salary")).show()

+------------+------------------+
|remote_ratio|    average_salary|
+------------+------------------+
|         100|144149.24196482718|
|          50| 82162.11926605504|
|           0|155592.48194365663|
+------------+------------------+



In [12]:
final_df.groupBy("Name").agg(F.avg("Rate").alias("average_rating")).show()

+--------------------+------------------+
|                Name|    average_rating|
+--------------------+------------------+
|Diamond Necklace ...|3.1396648044692737|
|     Dip Dye Sweater|3.2151162790697674|
|Le Vian Diamond R...|3.2032967032967035|
|Converse đen cổ thấp| 3.159779614325069|
|Flower Earrings 1...|3.1604938271604937|
|Disney Treasures ...|3.3248730964467006|
|  Vải Thiều Thanh Hà|3.2763819095477387|
|Hoop Earrings 14K...| 3.143617021276596|
|Unstoppable Love ...| 3.277511961722488|
|The Gangster, The...|3.0757575757575757|
|     Science & Faith|3.2254901960784315|
|         Xiaomi Mi 8|3.1707317073170733|
|Hallmark Diamonds...|3.1129943502824857|
|    Paper Coffee Cup|3.1837837837837837|
|    Levi's 511 Jeans| 3.263157894736842|
|Basic Jogger In B...|3.1508379888268156|
|Disney Treasures ...|3.1510416666666665|
|Le Bleu Homestay ...| 3.020942408376963|
|Samsung Galaxy Z ...| 3.098360655737705|
|Diamond Heart Nec...| 3.187192118226601|
+--------------------+------------

In [43]:
final_df.groupBy("ProductID").agg(F.count("Rate").alias("review_count")).show()

+---------+------------+
|ProductID|review_count|
+---------+------------+
|      148|         186|
|      471|         179|
|      496|         176|
|      463|         177|
|      540|         184|
|      243|         192|
|      392|         189|
|      623|         176|
|       31|         184|
|      516|         201|
|      137|         193|
|      451|         201|
|      251|         198|
|       85|         192|
|      580|         199|
|      458|         184|
|       65|         167|
|      481|         199|
|      588|         204|
|      255|         203|
+---------+------------+
only showing top 20 rows



24/06/19 17:08:21 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2931050 ms exceeds timeout 120000 ms
24/06/19 17:08:21 WARN SparkContext: Killing executors is not supported by current scheduler.


In [89]:
new_df.select("type").distinct().show()

                                                                                

+--------------------+
|                type|
+--------------------+
|PullRequestReview...|
|           PushEvent|
|         GollumEvent|
|        ReleaseEvent|
|  CommitCommentEvent|
|         CreateEvent|
|PullRequestReview...|
|   IssueCommentEvent|
|         DeleteEvent|
|         IssuesEvent|
|           ForkEvent|
|         PublicEvent|
|         MemberEvent|
|          WatchEvent|
|    PullRequestEvent|
+--------------------+



In [24]:
customer_avg_rating_df = final_df.groupBy("CustomerID").agg(F.avg("Rate").alias("average_rating"))
customer_avg_rating_df.show()

+----------+------------------+
|CustomerID|    average_rating|
+----------+------------------+
|    103747|               4.0|
|    103638|               3.0|
|    103767|               5.0|
|    103988|               5.0|
|    103713|               1.0|
|    103541| 3.821852731591449|
|    103970|               3.0|
|    103506| 3.699099099099099|
|    103883| 4.371373307543521|
|    103880|2.3123359580052494|
|    103962|2.9343629343629343|
|    103648|               5.0|
|    103755|               2.0|
|     15221|               1.0|
|    103476|2.5081521739130435|
|    103698|               1.0|
|    103876|               4.0|
|    103309| 3.947611710323575|
|    103568|               2.0|
|    103446|               3.0|
+----------+------------------+
only showing top 20 rows



In [19]:
# CreateDate 컬럼을 날짜와 시간으로 분리
final_df = final_df.withColumn("Date", F.to_date("CreateDate", "yyyy/MM/dd HH:mm:ss"))
final_df = final_df.withColumn("Hour", F.hour("CreateDate"))

final_df.show(10)

+----------+---------+----+-------------------+----------------+--------------------+---------+----------+----+
|CustomerID|ProductID|Rate|         CreateDate|        NickName|                Name|UnitPrice|      Date|Hour|
+----------+---------+----+-------------------+----------------+--------------------+---------+----------+----+
|    103416|      619|   1|2018/01/01 01:36:30|    vanduong0403|Le Vian Chocolate...|     1049|2018-01-01|null|
|    103654|      411|   1|2018/01/01 01:36:35| thutruc.huynh.3|Diamond Heart Nec...|   299.99|2018-01-01|null|
|    103954|      298|   3|2018/01/01 01:36:38|    ha.n.hien.75|Heart Ring 1/5 ct...|      479|2018-01-01|null|
|    103672|      361|   5|2018/01/01 01:37:15|   nam.kimcham.1|Neil Lane Diamond...|   719.99|2018-01-01|null|
|    103960|      536|   5|2018/01/01 02:36:25|        tuanpkna|Disney Treasures ...|   449.99|2018-01-01|null|
|    103372|      481|   2|2018/01/01 02:36:32|      damphuhanh|Amethyst Heart Ne...|   599.99|2018-01-0

In [25]:
nickname_purchase_count_df = final_df.groupBy("NickName").agg(F.count("ProductID").alias("purchase_count"))
nickname_purchase_count_df.show()

+------------------+--------------+
|          NickName|purchase_count|
+------------------+--------------+
|          chausoco|           158|
|            Kojumi|           229|
|        trannhatvy|           273|
|          ruud0407|           368|
|         lenam.uit|          1427|
|        thanh.cule|           157|
|        nguyettram|           274|
|NguyenVuNhatChuong|           187|
|           ptkiuee|           144|
|     tuonglam.dang|          1088|
|           thuattq|           406|
|     dang.thang.31|           395|
|          phan.nhu|           117|
|            hhoahi|           135|
|        hoaquachsd|           158|
|         zeatop939|           289|
|    huong.truongmy|           265|
|          belimoon|           183|
|            ac7ive|           100|
|         daomandat|           746|
+------------------+--------------+
only showing top 20 rows



In [29]:
total_revenue_df = final_df.groupBy("Name").agg(F.sum("UnitPrice").alias("total_revenue"))
total_revenue_df.show()

+--------------------+------------------+
|                Name|     total_revenue|
+--------------------+------------------+
|Diamond Necklace ...|          125121.0|
|     Dip Dye Sweater|            2924.0|
|Le Vian Diamond R...|236598.17999999947|
|Converse đen cổ thấp|            7986.0|
|Flower Earrings 1...| 49246.37999999997|
|Disney Treasures ...|               0.0|
|  Vải Thiều Thanh Hà| 2187.009999999999|
|Hoop Earrings 14K...| 37598.12000000007|
|Unstoppable Love ...|522497.90999999864|
|The Gangster, The...|             198.0|
|     Science & Faith|               0.0|
|         Xiaomi Mi 8|          114800.0|
|Hallmark Diamonds...|23008.230000000058|
|    Paper Coffee Cup|              92.5|
|    Levi's 511 Jeans|            7438.5|
|Basic Jogger In B...|            3580.0|
|Disney Treasures ...|63358.079999999885|
|Le Bleu Homestay ...| 5737.639999999994|
|Samsung Galaxy Z ...|           67344.0|
|Diamond Heart Nec...| 40597.97000000004|
+--------------------+------------

In [30]:
high_rated_products = final_df.filter(final_df["Rate"] >= 4)
high_rated_products.show()

+----------+---------+----+-------------------+--------------------+--------------------+---------+
|CustomerID|ProductID|Rate|         CreateDate|            NickName|                Name|UnitPrice|
+----------+---------+----+-------------------+--------------------+--------------------+---------+
|    103672|      361|   5|2018/01/01 01:37:15|       nam.kimcham.1|Neil Lane Diamond...|   719.99|
|    103960|      536|   5|2018/01/01 02:36:25|            tuanpkna|Disney Treasures ...|   449.99|
|    103541|      498|   5|2018/01/01 02:36:50|         kitty.ngo.5|Citrine & Diamond...|   179.99|
|    103819|      155|   4|2018/01/01 02:37:10|    tran.anh.khoa.86|  Túi mây hình thang|       24|
|    103806|      310|   4|2018/01/01 03:36:20|        trung.la.315|Le Vian Diamond R...|  1299.99|
|    103726|      260|   4|2018/01/01 03:36:20|      phuoc.buithimy|Men's Diamond Ban...|  3999.99|
|    103651|      388|   5|2018/01/01 03:36:57|   thanh.thao.581187|Garnet MOM Heart ...|   179.99|


In [31]:
# 특정 고객 ID의 데이터 필터링
specific_customer_data = final_df.filter(final_df["CustomerID"] == 103416)
specific_customer_data.show()

+----------+---------+----+-------------------+------------+--------------------+---------+
|CustomerID|ProductID|Rate|         CreateDate|    NickName|                Name|UnitPrice|
+----------+---------+----+-------------------+------------+--------------------+---------+
|    103416|      619|   1|2018/01/01 01:36:30|vanduong0403|Le Vian Chocolate...|     1049|
|    103416|       59|   3|2018/01/05 22:37:32|vanduong0403|Hành Trình Về Phư...|        2|
|    103416|      170|   3|2018/01/07 04:37:32|vanduong0403|      Short Jeans 02|       18|
|    103416|      551|   3|2018/01/09 12:37:32|vanduong0403|Unstoppable Love ...|   149.99|
|    103416|      583|   3|2018/01/10 10:37:32|vanduong0403|Hoop Earrings 14K...|    46.99|
|    103416|      429|   3|2018/01/23 08:37:32|vanduong0403|Lab-Created Diamo...|   799.99|
|    103416|      648|   3|2018/01/24 02:37:32|vanduong0403|Diamond Promise R...|   299.99|
|    103416|      184|   1|2018/01/26 14:36:30|vanduong0403|    Paper Coffee Cup

24/06/20 13:55:11 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 935011 ms exceeds timeout 120000 ms
24/06/20 13:55:11 WARN SparkContext: Killing executors is not supported by current scheduler.
