sparksession 생성 먼저 해야 spark dataframe으로 json 데이터들을 불러올 수 있습니다

In [1]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAll([('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')])
spark = SparkSession.builder \
    .appName("spark test") \
    .config(conf=conf) \
    .getOrCreate()

In [2]:
spark

In [3]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
            .option("spark.mongodb.input.uri", 'mongodb://localhost:27017/nct.twitterdata') \
            .option("multiLine","true").load()
 
df.show() # spark datafrmae 출력
df.printSchema() # 스키마 출력
# df.columns # 모든 컬럼명 리스트로 출력
# df.count() # 행 개수 출력

+--------------------+------------+-----------+--------------------+------------------+--------------------+--------------------+--------------------+--------------+---------+------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----------------------------+------------------+-----------+--------------------+-------------------+--------------------+-----------------------+-----------+-------------+---------+--------------------+--------------------+-------------------------+-------------+---------+--------------------+
|                 _id|contributors|coordinates|          created_at|display_text_range|            entities|   extended_entities|      extended_tweet|favorite_count|favorited|filter_level| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply

In [4]:
# spark sql을 사용해 spark dataframe에 쿼리를 하기 위해선 임시 뷰를 생성해야합니다.
df.createOrReplaceTempView("twitterdata")
sql_result = spark.sql("SELECT * FROM twitterdata").show(5)

+--------------------+------------+-----------+--------------------+------------------+--------------------+--------------------+--------------+--------------+---------+------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----------------------------+------------------+-----------+--------------------+-------------------+--------------------+-----------------------+-----------+-------------+---------+--------------------+--------------------+-------------------------+-------------+---------+--------------------+
|                 _id|contributors|coordinates|          created_at|display_text_range|            entities|   extended_entities|extended_tweet|favorite_count|favorited|filter_level| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_

** 건의할 사항
한국어랑 영어(ko, en) 트윗만 불러오는게 어떨지

In [12]:
df.select('lang').show(5)

+----+
|lang|
+----+
|  ko|
|  in|
| und|
|  ko|
|  en|
+----+
only showing top 5 rows



그냥 df_name.select 으로 원하는 열만 뽑아올 수도 있습니다.

ex. 가장 상위에 있는 json의 경우 단순 쿼리로 뽑아내기 가능

id, created_at, text, retweet_count, favorite_count

In [7]:
depth0 = df.select("id","created_at", "text", "retweet_count", "favorite_count")
depth0.show(5)

+-------------------+--------------------+-------------------------+-------------+--------------+
|                 id|          created_at|                     text|retweet_count|favorite_count|
+-------------------+--------------------+-------------------------+-------------+--------------+
|1412595075171848192|Wed Jul 07 02:11:...|NCT DREAM(엔시티 드림)...|            0|             0|
|1412602031739604992|Wed Jul 07 02:39:...|     RT @Bubuyongi: NC...|            0|             0|
|1412602031647334405|Wed Jul 07 02:39:...|     @markleebase http...|            0|             0|
|1412602032138035205|Wed Jul 07 02:39:...|  RT @mbcplusm: 주간아...|            0|             0|
|1412602031978647558|Wed Jul 07 02:39:...|     RT @captainuwu: N...|            0|             0|
+-------------------+--------------------+-------------------------+-------------+--------------+
only showing top 5 rows



In [10]:
depth0.count()

576

행에 nested 되어있을 경우 (user)

user - name, screen_name, profile_image_url

In [11]:
user_df = df.select("user.*")
user_df.createOrReplaceTempView("user_sql")
user = spark.sql("select name, screen_name, profile_image_url from user_sql")
user.show(5)

+-----------------+-------------+--------------------+
|             name|  screen_name|   profile_image_url|
+-----------------+-------------+--------------------+
|교보핫트랙스 창원| hottracks_cw|http://pbs.twimg....|
|              za.|blueitubiruu_|http://pbs.twimg....|
|fariha | 31 dwm📌|     eymarkeu|http://pbs.twimg....|
|               🐧|   parkjiajun|http://pbs.twimg....|
|  จอง เนียร์young| ianew_skpm10|http://pbs.twimg....|
+-----------------+-------------+--------------------+
only showing top 5 rows



retweeted_status - id

: 리트윗한 트윗일 경우 원본 트윗의 id

In [14]:
from pyspark.sql.functions import expr


retweet_df = df.select("retweeted_status.*")
retweet_df.createOrReplaceTempView("ret_sql")
retweet_sql = spark.sql("select id from ret_sql")
# id 행 이미 있으니까 RT_id 라고 행 이름 변경
retweet = retweet_sql.withColumn("RT_id", expr("id"))
retweet = retweet.drop('id')
retweet.show(5)

+-------------------+
|              RT_id|
+-------------------+
|               null|
|1412440500527796228|
|               null|
|1412588863088848900|
|1412564786848534533|
+-------------------+
only showing top 5 rows



위에서 만든거 합침

In [15]:
from pyspark.sql.functions import monotonically_increasing_id

DF1 = depth0.withColumn("row_id", monotonically_increasing_id())
DF2 = user.withColumn("row_id", monotonically_increasing_id())
dep_usr = DF1.join(DF2, ("row_id")).drop("row_id")

In [16]:
dep_usr.show(5)

+-------------------+--------------------+--------------------+-------------+--------------+-----------------+------------+--------------------+
|                 id|          created_at|                text|retweet_count|favorite_count|             name| screen_name|   profile_image_url|
+-------------------+--------------------+--------------------+-------------+--------------+-----------------+------------+--------------------+
|1412602037892567041|Wed Jul 07 02:39:...|RT @iukkxrddfhan:...|            0|             0|           rpzlee| Rapunzelx31|http://pbs.twimg....|
|1412602038236680193|Wed Jul 07 02:39:...|RT @nct_menfess: ...|            0|             0|   Kasihh.tryanaa|KasihhTryana|http://pbs.twimg....|
|1412657043043852289|Wed Jul 07 06:17:...|RT @NCT127STRMth:...|            0|             0|      🌈🌤🌱✋🏼🙂|    knick_jj|http://pbs.twimg....|
|1412602051884851207|Wed Jul 07 02:39:...|RT @markeu_yayy: ...|            0|             0|kyy ◡̎ | 31dwm 📌|  markyourss|http://pbs.tw

dep_usr 을 monotonically_increasing_id 로 임시 row를 생성해서 retweet이랑 join 시도

monotonically_increasing_id 자체가 unique 와 increase 속성은 보장하지만 consecutive 한 속성은 보장하지 않아서 DF1 의 row_id가 0,1,2,8589934592,8589934593,... 이런식으로 인덱싱됨

DF2의 row_id는 0,1,2,3,.. 으로 들어가서 두개 join하면 0,1,2행밖에 조인이 안됨

연속적인 숫자를 생성하는 row_number() 함수를 사용

In [62]:
from pyspark.sql.functions import *
from pyspark.sql.window import *

DF1_tmp = dep_usr.withColumn("row_id", monotonically_increasing_id())
window = Window.orderBy(col('row_id'))
DF1 = DF1_tmp.withColumn('row_id', row_number().over(window))

DF2_tmp = retweet.withColumn("row_id", monotonically_increasing_id())
DF2 = DF2_tmp.withColumn('row_id', row_number().over(window))
result = DF1.join(DF2, ("row_id")).drop("row_id"); result.show(5)

+-------------------+--------------------+--------------------+-------------+--------------+-----------------+------------+--------------------+-------------------+
|                 id|          created_at|                text|retweet_count|favorite_count|             name| screen_name|   profile_image_url|              RT_id|
+-------------------+--------------------+--------------------+-------------+--------------+-----------------+------------+--------------------+-------------------+
|1412602037892567041|Wed Jul 07 02:39:...|RT @iukkxrddfhan:...|            0|             0|           rpzlee| Rapunzelx31|http://pbs.twimg....|               null|
|1412602038236680193|Wed Jul 07 02:39:...|RT @nct_menfess: ...|            0|             0|   Kasihh.tryanaa|KasihhTryana|http://pbs.twimg....|1412440500527796228|
|1412657043043852289|Wed Jul 07 06:17:...|RT @NCT127STRMth:...|            0|             0|      🌈🌤🌱✋🏼🙂|    knick_jj|http://pbs.twimg....|               null|
|14126020518848

In [63]:
result.count()

576

In [64]:
DF1.select("row_id").show()

+------+
|row_id|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
|    20|
+------+
only showing top 20 rows



In [65]:
DF2.select("row_id").show()

+------+
|row_id|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
|    20|
+------+
only showing top 20 rows



spark dataframe local에 저장

In [None]:
# json으로 저장
# df_name.write.format("json").mode("overwrite").save("file.json")
# csv 로 저장
result.write.option("header", True).format("csv").mode("overwrite").save("result.csv")

pandas dataframe으로 변경

In [66]:
import pandas as pd

pandas = result.select("*").toPandas()
pandas

Unnamed: 0,id,created_at,text,retweet_count,favorite_count,name,screen_name,profile_image_url,RT_id
0,1412602037892567041,Wed Jul 07 02:39:05 +0000 2021,RT @iukkxrddfhan: pls rt แจก 🌈🌤\nบั้มเปล่าจีเว...,0,0,rpzlee,Rapunzelx31,http://pbs.twimg.com/profile_images/1410818825...,
1,1412602038236680193,Wed Jul 07 02:39:05 +0000 2021,RT @nct_menfess: Kata Mark Haechan itu mirip b...,0,0,Kasihh.tryanaa,KasihhTryana,http://pbs.twimg.com/profile_images/1387236819...,1.412441e+18
2,1412657043043852289,Wed Jul 07 06:17:39 +0000 2021,RT @NCT127STRMth: 210707 แทอิล (mo.on_air) กดฟ...,0,0,🌈🌤🌱✋🏼🙂,knick_jj,http://pbs.twimg.com/profile_images/9577156416...,
3,1412602051884851207,Wed Jul 07 02:39:08 +0000 2021,RT @markeu_yayy: ✧☞ day 6🐯🍉\n\n[Q : iconic wor...,0,0,kyy ◡̎ | 31dwm 📌,markyourss,http://pbs.twimg.com/profile_images/1410500304...,1.412589e+18
4,1412616883702927364,Wed Jul 07 03:38:04 +0000 2021,RT @HanteoNews: #HANTEO GLOBAL K-POP REPORT : ...,0,0,syaa,myleejeno00,http://pbs.twimg.com/profile_images/1412616011...,1.412565e+18
...,...,...,...,...,...,...,...,...,...
571,1412657035833876491,Wed Jul 07 06:17:37 +0000 2021,RT @vlive_th: เย็นวันนี้❣️ ซีจือนี่เตรียมแท่งไ...,0,0,&,mrsjeong5,http://pbs.twimg.com/profile_images/1396818007...,1.413492e+18
572,1412657036362407944,Wed Jul 07 06:17:38 +0000 2021,RT @SMerch_store: Sharing Postcard book NCT 12...,0,0,#sociallydistant,richardhdent,http://pbs.twimg.com/profile_images/1318765142...,1.413510e+18
573,1412616889583370242,Wed Jul 07 03:38:06 +0000 2021,กติกา\n1.กดฟอลไอจี kpopshop_everyty \n2.กดฟอล...,0,0,ตอบแชทไวมาก🔥 รับผ่อนอัลบั้ม,everytyshop,http://pbs.twimg.com/profile_images/1394989610...,1.413493e+18
574,1412657040871276546,Wed Jul 07 06:17:39 +0000 2021,RT @Billboard_JAPAN: 【今週のダウンロード・アルバム・チャート“Down...,0,0,김⁷ |🍷𝑃𝑒𝑟𝑚𝑖𝑠𝑠𝑖𝑜𝑛 𝑇𝑜 𝐷𝑎𝑛𝑐𝑒🤠⅓,parkjm_0613,http://pbs.twimg.com/profile_images/1412565215...,1.413493e+18


In [67]:
pandas['id'].count()

576

빅쿼리로 보내기

In [None]:
# Saving the data to BigQuery
word_count.write.format('bigquery') \
  .option('table', 'wordcount_dataset.wordcount_output') \
  .save()