In [29]:
import pyspark
import json

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import lit # to add null column
from pyspark.sql.functions import monotonically_increasing_id

# pyspark init ===========================================
spark = pyspark.sql.SparkSession\
    .builder\
    .master('local')\
    .appName('ML-learning')\
    .config(conf=pyspark.SparkConf())\
    .getOrCreate()

# read json file and print schema =======================
disasterDf = spark.read.json("data/disaster.json")
disasterDf.printSchema()

# drop language username created_at columns ============
disasterDf = disasterDf.drop("language").drop('username').drop("created_at")

# merge hashtags with "%20" and chname hashtags to keyword 
mergeHashtags = udf(lambda x: "%20".join(x), StringType())
disasterDf = disasterDf.withColumn("keyword", mergeHashtags(disasterDf["hashtags"]))
disasterDf = disasterDf.drop('hashtags')

# chname tweet to text ================================
disasterDf = disasterDf.withColumnRenamed("tweet", "text")

# add null column named location ======================
disasterDf = disasterDf.withColumn("location", lit(None).cast(StringType()))

# add "id" column =====================================
disasterDf = disasterDf.withColumn("id", monotonically_increasing_id())

# rearrange order of columns ==========================
disasterDf = disasterDf.select("id", "keyword", "location", "text")
disasterDf.show()

# save DataFrame as json file =========================
# disasterDf.write.json('data/disaster-new.json')

root
 |-- created_at: long (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- language: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- username: string (nullable = true)

+---+--------------------+--------+--------------------+
| id|             keyword|location|                text|
+---+--------------------+--------+--------------------+
|  0|dogecoin%20shib%2...|    null|"fair economy, wo...|
|  1|hungry%20starving...|    null|Gaming rn and I h...|
|  2|conspiracytheorie...|    null|@Loserfruit Forgo...|
|  3|lennyhenry%20redn...|    null|Remember when #Le...|
|  4|europe%20africa%2...|    null| https://t.co/8Y1...|
|  5|gold%20diamond%20...|    null|7.4 million has #...|
|  6|underpaid%20starv...|    null|@serenawilliams s...|
|  7|america%20africa%...|    null|Minivans at the F...|
|  8|black%20blackpeop...|    null|@kimKBaltimore #b...|
|  9|shortstory%20crea...|    null|I just published ...|
| 10|colo

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

### HTML 제거

In [30]:
import re

html_regexps = re.compile(r"https?://[a-zA-Z0-9/.]*\b")
removeHtml = udf(lambda x: html_regexps.sub("", x), StringType())
disasterDf = disasterDf.withColumn("text", removeHtml(disasterDf['text']))


### 이모티콘 제거

In [31]:
emoji_pattern = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002500-\U00002BEF"  # chinese char
        u"\U00002702-\U000027B0"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U00010000-\U0010ffff"
        u"\u2640-\u2642" 
        u"\u2600-\u2B55"
        u"\u200d"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\ufe0f"  # dingbats
        u"\u3030"
                      "]+", re.UNICODE)

remove_emoji = udf(lambda x: emoji_pattern.sub("", x), StringType())
disasterDf = disasterDf.withColumn("text", remove_emoji(disasterDf["text"]))

### punctuations 제거

In [32]:
import string

myStr = 'asdfa ;ja;9j2r; ok;aoisjd f;j;aoIJ R;OA2J'
table = str.maketrans('', '', string.punctuation)
myStr.translate(table)

'asdfa ja9j2r okaoisjd fjaoIJ ROA2J'

In [33]:
table = str.maketrans('', '', string.punctuation)
remove_punctuation = udf(lambda x: x.translate(table), StringType())
disasterDf = disasterDf.withColumn("text", remove_punctuation(disasterDf["text"]))

In [34]:
disasterDf[disasterDf['id'] == 243].rdd.collect()

[Row(id=243, keyword='train%20fire%20boxing%20thesweatlife', location=None, text='On fire  just like my face  train fire boxing thesweatlife  ')]

In [36]:
disasterDf.write.json("data/disaster-new.json")

spark.stop()