# Spark 연동 및 Spark SQL 테스트

In [1]:
import os
import time 
from pyspark.sql import Row

In [2]:
df = (spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://localhost/twitter.ko_tweet").load())

                                                                                

In [3]:
df.createOrReplaceTempView("tweets")

In [4]:
# df 선택하기 예시
df.show(5)

+--------------------+--------------------+--------------------+
|                 _id|          _timestamp|                data|
+--------------------+--------------------+--------------------+
|{61ffbb80f6b8e9bd...|2022-02-06T21:13:...|{{null}, 11366454...|
|{61ffbb81f6b8e9bd...|2022-02-06T21:13:...|{{null}, 13185747...|
|{61ffbb81f6b8e9bd...|2022-02-06T21:13:...|{{null}, 13209349...|
|{61ffbb81f6b8e9bd...|2022-02-06T21:13:...|{{null}, 24044293...|
|{61ffbb81f6b8e9bd...|2022-02-06T21:13:...|{{null}, 30422362...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [5]:
#By using col() function
from pyspark.sql.functions import col
# df.select(col("data.created_at"), col("data.text"), col("data.lang") == 'ko').show(truncate=False)
# df.filter("data.lang = 'ko'").collect()

In [7]:
# spark.sql("select data from tweets").show(3)

In [6]:
query = """
select
        data.lang
    , count(*) as cnt
from tweets
group by data.lang
order by cnt desc
"""

### 속도가 매우느리다
- 쿼리의 실행 속도는 스토리지의 성능에 의존한다.
- MongoDB의 경우 열지 향 스토리지처럼 컬럼 단위의 읽기에
- 최적화 되어있지는 않으므로, 그대로는 고속 집계에 적합하지 않다.
- 최적화를 위해서는 한 차례 데이터를 추출해야 한다.
- 234page

In [11]:
strt_time = time.time()
spark.sql(query).show()
print("time: ", time.time() - strt_time)



+----+------+
|lang|   cnt|
+----+------+
|  en|284549|
|  ja|189826|
| und| 88200|
|  th| 61411|
|  es| 56196|
|  ko| 44486|
|  ar| 44412|
|  tr| 42052|
|  pt| 40400|
|  in| 38053|
|  fr| 19267|
|  tl| 14915|
|  hi| 14359|
|  it|  9141|
|  ru|  5946|
|  de|  5654|
|  zh|  5024|
|  fa|  4956|
|  pl|  4031|
|  ur|  3548|
+----+------+
only showing top 20 rows

time:  30.504709243774414


                                                                                

## 텍스트 데이터의 가공
- 234 page

In [10]:
spark.sql("select t1.*,  t1.data.text from tweets t1").show(3)

+--------------------+--------------------+--------------------+-----------------------+
|                 _id|          _timestamp|                data|                   text|
+--------------------+--------------------+--------------------+-----------------------+
|{61ffbb80f6b8e9bd...|2022-02-06T21:13:...|{{null}, 11366454...|@dkdtmxkcoth 나도 같...|
|{61ffbb81f6b8e9bd...|2022-02-06T21:13:...|{{null}, 13185747...|   RT @Rmlove09127: ...|
|{61ffbb81f6b8e9bd...|2022-02-06T21:13:...|{{null}, 13209349...|   RT @BYangsalang: ...|
+--------------------+--------------------+--------------------+-----------------------+
only showing top 3 rows



In [39]:
query = """
select from_unixtime(data.created_at / 1000) time, data.text
from tweets where data.lang = 'en'
"""

query_2 = """
select data.created_at, data.text
from tweets where data.lang = 'en'
"""

In [113]:
spark.sql(query_2).show(3)

+--------------------+--------------------+
|          created_at|                text|
+--------------------+--------------------+
|2022-02-06T12:13:...|@EyesRightPhoto @...|
|2022-02-06T12:13:...|RT @IanByrneMP: R...|
|2022-02-06T12:13:...| @ifnotyoursha tilly|
+--------------------+--------------------+
only showing top 3 rows



In [11]:
from pyspark.sql import Row
from datetime import datetime
import dateutil.parser

def string_to_datetime(str):
    d = dateutil.parser.parse(str)
    return d.strftime('%Y-%m-%d %H:%M:%S')
    # return datetime.strptime(str, '%Y-%m-%dT%H:%M:%S.000Z')

def text_split(row):
    """트윗을 단어로 분해하는 제너레이터 함수
        :Args: row 
        
        :returns 
            - Row() 객체로 반환
    """
    if row.data:
        for word in row.data.text.split():
            yield Row(time=string_to_datetime(row.data.created_at), word=word)
            # yield Row(time=row.data.created_at, word=word)
        # except as e:
            

#### '.rdd'로 원시 레코드 참조

In [14]:
df.rdd.take(5)

[Row(_id=Row(oid='61ffbb80f6b8e9bd026bb4f6'), _timestamp='2022-02-06T21:13:52.836027+09:00', data=Row(attachments=Row(media_keys=None), author_id='1136645494019461120', context_annotations=None, conversation_id='1490279570297307137', created_at='2022-02-06T12:13:41.000Z', entities=Row(hashtags=None, mentions=[Row(start=0, end=12, username='dkdtmxkcoth', id='1424626232667242496')], urls=None), id='1490297645189177344', in_reply_to_user_id='1424626232667242496', lang='ko', possibly_sensitive=False, public_metrics=Row(retweet_count=0, reply_count=0, like_count=0, quote_count=0), referenced_tweets=[Row(type='replied_to', id='1490297548354899972')], reply_settings='everyone', source='Twitter for Android', text='@dkdtmxkcoth 나도 같커잖아...')),
 Row(_id=Row(oid='61ffbb81f6b8e9bd026bb4ff'), _timestamp='2022-02-06T21:13:53.025851+09:00', data=Row(attachments=Row(media_keys=None), author_id='1318574774214225927', context_annotations=None, conversation_id='1490297649399926789', created_at='2022-02-06

#### flatMap()에 제너레이터 함수 적용

In [15]:
df.rdd.flatMap(text_split).take(5)

[Row(time='2022-02-06 12:13:41', word='@dkdtmxkcoth'),
 Row(time='2022-02-06 12:13:41', word='나도'),
 Row(time='2022-02-06 12:13:41', word='같커잖아...'),
 Row(time='2022-02-06 12:13:42', word='RT'),
 Row(time='2022-02-06 12:13:42', word='@Rmlove09127:')]

#### toDF()를 사용해 데이터 프레임으로 변환

In [16]:
df.rdd.flatMap(text_split).toDF()

DataFrame[time: string, word: string]

### Spark 프로그램에 있어서의 DAG 실행
- 235 page

In [18]:
ko_words = df.rdd.flatMap(text_split).toDF()

In [19]:
ko_words.createOrReplaceTempView("ko_words")

In [20]:
query = """
select
        word
    , count(*) as cnt
from ko_words
group by word
order by cnt desc
"""

spark.sql(query).show(10)

[Stage 12:>                                                         (0 + 4) / 4]

+----+------+
|word|   cnt|
+----+------+
|  RT|561390|
|너무| 39247|
|진짜| 36304|
|  이| 24138|
|   :| 23896|
|  다| 23381|
|  수| 23294|
|   -| 21314|
|  아| 19835|
|  한| 19593|
+----+------+
only showing top 10 rows



                                                                                

### 저장
- 책 236page 

In [21]:
ko_words.write.saveAsTable("ko_words_20220214")

22/02/14 23:30:17 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/02/14 23:30:17 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/02/14 23:30:19 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/02/14 23:30:19 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore mentha@127.0.1.1
22/02/14 23:38:59 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
22/02/14 23:38:59 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
22/02/14 23:38:59 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/02/14 23:38:59 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


In [22]:
!ls -R spark-warehouse

spark-warehouse:
ko_words_20220214  ko_words_sample

spark-warehouse/ko_words_20220214:
part-00000-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00001-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00002-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00003-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00004-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00005-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00006-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00007-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00008-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00009-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00010-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00011-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00012-50910220-3ef8-42cf-81df-4416681be13b-c000.snappy.parquet
part-00013-50910220-3ef8-42c

### 저장한 파일 사용하기

In [23]:
spark.table("ko_words_20220214").count()

11629522

In [24]:
spark.table("ko_words_20220214").show(3)

+-------------------+---------+
|               time|     word|
+-------------------+---------+
|2022-02-13 15:55:35|       RT|
|2022-02-13 15:55:35|@s_bakkk:|
|2022-02-13 15:55:35|  #겹친소|
+-------------------+---------+
only showing top 3 rows



In [28]:
query = """
select
        word
    , count(*) as cnt
from ko_words_20220214
group by word
order by cnt desc
"""

spark.sql(query).show(50)



+-----------------+------+
|             word|   cnt|
+-----------------+------+
|               RT|561390|
|             너무| 39247|
|             진짜| 36304|
|               이| 24138|
|                :| 23896|
|               다| 23381|
|               수| 23294|
|                -| 21314|
|               아| 19835|
|               한| 19593|
|               안| 19313|
|               나| 19238|
|               잘| 18937|
|               더| 18695|
|               거| 18434|
|               그| 17828|
|             내가| 16411|
|             오늘| 16301|
|             선수| 16281|
|             우리| 16065|
|               내| 14520|
|               것| 14329|
|               왜| 13742|
|             그냥| 13593|
|             지금| 13419|
|             근데| 13115|
|             있는| 12211|
|               저| 11905|
|               좀| 11898|
|             많이| 11796|
|               또| 11572|
|                || 11546|
|             하는| 11544|
|             이거| 11383|
|               때| 10940|
|             보고| 

                                                                                

In [35]:
query = """
select
      substr(time, 1, 10) as date  
    , word
    , count(*) as cnt
from ko_words_20220214
group by 
        substr(time, 1, 10)
    , word
order by cnt desc
"""

spark.sql(query).show(50)

22/02/14 23:55:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/02/14 23:55:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/02/14 23:55:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/02/14 23:55:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 48:>                                                         (0 + 4) / 4]

+----------+----+-----+
|      date|word|  cnt|
+----------+----+-----+
|2022-02-07|  RT|87291|
|2022-02-11|  RT|84064|
|2022-02-13|  RT|82991|
|2022-02-10|  RT|77298|
|2022-02-14|  RT|64177|
|2022-02-08|  RT|57218|
|2022-02-12|  RT|53338|
|2022-02-06|  RT|31867|
|2022-02-09|  RT|23146|
|2022-02-07|진짜| 6555|
|2022-02-11|너무| 5925|
|2022-02-13|너무| 5899|
|2022-02-10|너무| 5802|
|2022-02-11|진짜| 5782|
|2022-02-07|너무| 5369|
|2022-02-13|진짜| 5304|
|2022-02-07|  다| 4640|
|2022-02-10|진짜| 4600|
|2022-02-07|   :| 4291|
|2022-02-14|   :| 4139|
|2022-02-14|너무| 3889|
|2022-02-14|진짜| 3889|
|2022-02-12|너무| 3871|
|2022-02-07|선수| 3797|
|2022-02-10|   -| 3777|
|2022-02-14|오늘| 3759|
|2022-02-07|  잘| 3750|
|2022-02-07|  이| 3741|
|2022-02-08|너무| 3722|
|2022-02-11|  이| 3679|
|2022-02-13|  이| 3675|
|2022-02-07|  수| 3644|
|2022-02-10|  이| 3505|
|2022-02-13|  다| 3476|
|2022-02-13|  한| 3423|
|2022-02-08|진짜| 3395|
|2022-02-07|중국| 3373|
|2022-02-11|  수| 3357|
|2022-02-13|  수| 3338|
|2022-02-11|   :| 3300|
|2022-02-13

                                                                                

In [39]:
# 1 일 단위로 한국 Tweets 데이터만 수집해서 추이 확인하기
# 단어수는 매일 체크함

In [36]:
df = spark.sql(query).toPandas()

22/02/14 23:55:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [38]:
df.to_csv("ko_words_date.csv", encoding="utf8")

### 날짜 예제
- 참고1: https://ourcstory.tistory.com/109
- 참고2: https://docs.python.org/ko/3.9/library/datetime.html#strftime-and-strptime-format-codes

In [80]:
from datetime import datetime
import tzlocal

unix_timestamp = float("1284101485")
local_timezone = tzlocal.get_localzone() # get pytz timezone
local_time = datetime.fromtimestamp(unix_timestamp, local_timezone)
print(local_time.strftime("%Y-%m-%d %H:%M:%S.%f%z (%Z)"))

datetime.strptime("2022-02-06T12:13:41", '%Y-%m-%dT%H:%M:%S')

2010-09-10 15:51:25.000000+0900 (KST)


datetime.datetime(2022, 2, 6, 12, 13, 41)

In [91]:
# 참고자료: https://stackoverflow.com/questions/214777/how-do-you-convert-yyyy-mm-ddthhmmss-000z-time-format-to-mm-dd-yyyy-time-forma
from datetime import datetime
date_format = "%Y-%m-%dT%H:%M:%S.%fZ" 
datetime.strptime('2008-09-26T01:51:42.000Z', date_format)
# datetime(2008, 9, 26, 1, 51, 42)

import dateutil.parser

d = dateutil.parser.parse('2008-09-26T01:51:42.000Z')
print(d.strftime('%m/%d/%Y'))  #==> '09/26/2008'

09/26/2008
