# Wstępna obróbka danych

In [None]:
#| include: false

# FOR LOCAL APP
# download xml jar from bucket
import boto3
s3 = boto3.client('s3')
s3.list_objects(Bucket="misc-beer-and-wine")

PATH="misc/jars/spark-xml_2.12-0.15.0.jar"
s3.download_file("misc-beer-and-wine", "spark-xml_2.12-0.5.0.jar", PATH)

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[12]").appName("MyApp").config("spark.jars", PATH).getOrCreate()

## Konfiguracja aplikacji

W celu przygotowania danych do analizy zostały one wstępnie przetworzone. Pierwszym etapem wstępnego przetwarzania jest wczytanie danych do środowiska analitycznego. Dane surowe, przechowywane w koszyku `raw-data-beer-and-wine` znajowały się w mało przyjaznym dla analiz formacie `xml`. Wczytanie tego typu danych wymagało załadowania dodatkowego pakietu `jar` o nazwie `spark-xml_2.12:0.15.0` pobranego z repozytorium `maven`. 

W serwisie `EMR` można dodać tego typu pakiety wykorzystując specjalne polecenia typy `Sparkmagic` rozpoczynające się od znaków `%%`. W tym przypadku użyto `%%configure`:

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "com.databricks:spark-xml_2.12:0.14.0"
    }
}

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Preprocessing").getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1674145937048_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Schematy danych

### Users

In [4]:
from pyspark.sql.types import *

users_schema = StructType([
    StructField('_AboutMe', StringType(), True),
    StructField('_AccountId', IntegerType(), True),
    StructField('_CreationDate', TimestampType(), True),
    StructField("_DisplayName", StringType(), True),
    StructField("_DownVotes", IntegerType(), True),
    StructField("_Id", IntegerType(), True),
    StructField("_LastAccessDate", TimestampType()),
    StructField("_Location", StringType(), True),
    StructField("_ProfileImageUrl", StringType(), True),
    StructField("_Reputation", IntegerType(), True),
    StructField("_UpVotes", IntegerType(), True),
    StructField("_Views", IntegerType(), True),
    StructField("_WebsiteUrl", StringType(), True)
])

users = spark.read.format('xml').options(rowTag='row').schema(users_schema).load("s3://raw-data-beer-and-wine/Users.xml")
users.show(2, vertical=True, truncate=50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------------------------------------------------------
 _AboutMe         | <p>Hi, I'm not really a person.</p>\n\n<p>I'm a... 
 _AccountId       | -1                                                 
 _CreationDate    | 2014-01-21 17:45:53.587                            
 _DisplayName     | Community                                          
 _DownVotes       | 503                                                
 _Id              | -1                                                 
 _LastAccessDate  | 2014-01-21 17:45:53.587                            
 _Location        | on the server farm                                 
 _ProfileImageUrl | null                                               
 _Reputation      | 1                                                  
 _UpVotes         | 2                                                  
 _Views           | 5                                                  
 _WebsiteUrl      | http://meta.stackexchange.com/              

### Tags

In [5]:
tags_schema = StructType([
    StructField('_Count', IntegerType(), True),
    StructField('_ExcerptPostId', IntegerType(), True),
    StructField('_Id', IntegerType(), True),
    StructField("_TagName", StringType(), True),
    StructField("_WikiPostId", IntegerType(), True)
])

tags = spark.read.format('xml').options(rowTag='row').schema(tags_schema).load("s3://raw-data-beer-and-wine/Tags.xml")
tags.show(n=5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------------+---+-----------+-----------+
|_Count|_ExcerptPostId|_Id|   _TagName|_WikiPostId|
+------+--------------+---+-----------+-----------+
|    17|          5062|  1|       hops|       5061|
|    85|          7872|  2|    history|       7871|
|    69|          4880|  4|    brewing|       4879|
|    37|          5109|  5|    serving|       5108|
|    31|           304|  6|temperature|        303|
+------+--------------+---+-----------+-----------+
only showing top 5 rows

### Votes

In [6]:
votes_schema = StructType([
    StructField('_BountyAmount', IntegerType(), True),
    StructField('_CreationDate', TimestampType(), True),
    StructField('_Id', IntegerType(), True),
    StructField("_PostId", StringType(), True),
    StructField("_UserId", IntegerType(), True),
    StructField("_VoteTypeId", IntegerType(), True)
])

votes = spark.read.format('xml').options(rowTag='row').schema(votes_schema).load("s3://raw-data-beer-and-wine/Votes.xml")
votes.show(n=5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-------------------+---+-------+-------+-----------+
|_BountyAmount|      _CreationDate|_Id|_PostId|_UserId|_VoteTypeId|
+-------------+-------------------+---+-------+-------+-----------+
|         null|2014-01-21 00:00:00|  1|      1|   null|          2|
|         null|2014-01-21 00:00:00|  2|      1|   null|          2|
|         null|2014-01-21 00:00:00|  3|      4|   null|          2|
|         null|2014-01-21 00:00:00|  4|      1|   null|          2|
|         null|2014-01-21 00:00:00|  5|      4|   null|          2|
+-------------+-------------------+---+-------+-------+-----------+
only showing top 5 rows

### Posts

In [7]:
posts_schema = StructType([
    StructField('_AcceptedAnswerId', IntegerType(), True),
    StructField('_AnswerCount', IntegerType(), True),
    StructField('_Body', StringType(), True),
    StructField("_ClosedDate", TimestampType(), True),
    StructField("_CommentCount", IntegerType(), True),
    StructField("_CommunityOwnedDate", TimestampType(), True),
    StructField("_ContentLicense", StringType(), True),
    StructField("_CreationDate", TimestampType(), True),
    StructField("_FavoriteCount", IntegerType(), True),
    StructField("_Id", IntegerType(), True),
    StructField("_LastActivityDate", TimestampType(), True),
    StructField("_LastEditDate", TimestampType(), True),
    StructField("_LastEditorDisplayName", StringType(), True),
    StructField("_LastEditorUserId", IntegerType(), True),
    StructField("_OwnerDisplayName", StringType(), True),
    StructField("_OwnerUserId", IntegerType(), True),
    StructField("_ParentId", IntegerType(), True),
    StructField("_PostTypeId", IntegerType(), True),
    StructField("_Score", IntegerType(), True),
    StructField("_Tags", StringType(), True),
    StructField("_Title", StringType(), True),
    StructField("_ViewCount", IntegerType(), True),
])

posts = spark.read.format('xml').options(rowTag='row').schema(posts_schema).load("s3://raw-data-beer-and-wine/Posts.xml")
posts.show(n=1,vertical=True, truncate=50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------------------------------------------------------------
 _AcceptedAnswerId      | 4                                                  
 _AnswerCount           | 1                                                  
 _Body                  | <p>I was offered a beer the other day that was ... 
 _ClosedDate            | null                                               
 _CommentCount          | 0                                                  
 _CommunityOwnedDate    | null                                               
 _ContentLicense        | CC BY-SA 3.0                                       
 _CreationDate          | 2014-01-21 20:26:05.383                            
 _FavoriteCount         | null                                               
 _Id                    | 1                                                  
 _LastActivityDate      | 2014-01-21 22:04:34.977                            
 _LastEditDate          | 2014-01-21 22:04:34.977               

### Post links

In [10]:
links_schema = StructType([
    StructField("_CreationDate", TimestampType()),
    StructField("_Id", IntegerType()),
    StructField("_LinkTypeId", IntegerType()),
    StructField("_PostId", IntegerType()),
    StructField("_RelatedPostId", IntegerType())
])

links = spark.read.format('xml').options(rowTag='row').schema(links_schema).load("s3://raw-data-beer-and-wine/PostLinks.xml")
links.show(n=5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---+-----------+-------+--------------+
|_CreationDate          |_Id|_LinkTypeId|_PostId|_RelatedPostId|
+-----------------------+---+-----------+-------+--------------+
|2014-01-21 21:04:25.23 |25 |3          |29     |25            |
|2014-01-21 21:42:09.103|89 |1          |83     |50            |
|2014-01-21 21:50:41.313|95 |1          |86     |2             |
|2014-01-21 22:07:35.783|101|3          |47     |99            |
|2014-01-21 22:13:51.38 |102|1          |74     |3             |
+-----------------------+---+-----------+-------+--------------+
only showing top 5 rows

### Post History

In [11]:
history_schema = StructType([
    StructField("_Comment", StringType()),
    StructField("_ContentLicense", StringType()),
    StructField("_CreationDate", TimestampType()),
    StructField("_Id", IntegerType()),
    StructField("_PostHistoryTypeId", IntegerType()),
    StructField("_PostId", IntegerType()),
    StructField("_RevisionGUID", StringType()),
    StructField("_Text", StringType()),
    StructField("_UserDisplayName", StringType()),
    StructField("_UserId", IntegerType()),
])

history = spark.read.format('xml').options(rowTag='row').schema(history_schema).load("s3://raw-data-beer-and-wine/PostHistory.xml")
history.show(n=2,vertical=True, truncate=50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------------------------------
 _Comment           | null                                               
 _ContentLicense    | CC BY-SA 3.0                                       
 _CreationDate      | 2014-01-21 20:26:05.383                            
 _Id                | 1                                                  
 _PostHistoryTypeId | 2                                                  
 _PostId            | 1                                                  
 _RevisionGUID      | a17002a0-00b0-417b-a404-0d8864bbbca5               
 _Text              | I was offered a beer the other day that was rep... 
 _UserDisplayName   | null                                               
 _UserId            | 7                                                  
-RECORD 1----------------------------------------------------------------
 _Comment           | null                                               
 _ContentLicense    | CC BY-SA 3.0    

### Badges

In [12]:
badges_schema = StructType([
    StructField("_Class", IntegerType()),
    StructField("_Date", TimestampType()),
    StructField("_Id", IntegerType()),
    StructField("_Name", StringType()),
    StructField("_TagBased", BooleanType()),
    StructField("_UserId", IntegerType()),
])

badges = spark.read.format('xml').options(rowTag='row').schema(badges_schema).load("s3://raw-data-beer-and-wine/Badges.xml")
badges.show(n=5,truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------------------+---+--------------+---------+-------+
|_Class|_Date                 |_Id|_Name         |_TagBased|_UserId|
+------+----------------------+---+--------------+---------+-------+
|3     |2014-01-21 20:52:16.97|1  |Autobiographer|false    |1      |
|3     |2014-01-21 20:52:16.97|2  |Autobiographer|false    |2      |
|3     |2014-01-21 20:52:16.97|3  |Autobiographer|false    |6      |
|3     |2014-01-21 20:52:16.97|4  |Autobiographer|false    |7      |
|3     |2014-01-21 20:52:16.97|5  |Autobiographer|false    |9      |
+------+----------------------+---+--------------+---------+-------+
only showing top 5 rows

## Czyszczenie kolumn tekstowych

In [17]:
from pyspark.sql.functions import regexp_replace, trim, udf, col

from bs4 import BeautifulSoup
from html import unescape

def tags_remove(s):
    if s is not None:
        soup = BeautifulSoup(unescape(s), 'lxml')
        return soup.text
    else: 
        return None
udf_tags_remove = udf(lambda m: tags_remove(m))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
users_clean = users.withColumn("_AboutMe_clean", regexp_replace("_AboutMe", "\n|\t|\r", " ")) \
    .withColumn("_AboutMe_clean", udf_tags_remove(col('_AboutMe_clean'))) \
    .withColumn("_AboutMe_clean", regexp_replace("_AboutMe_clean", "\s{2,}", " ")) \
    .withColumn("_AboutMe_clean", trim("_AboutMe_clean"))


history_clean = history.withColumn("_Text_clean", regexp_replace("_Text", "\n|\t|\r", " ")) \
    .withColumn("_Text_clean", udf_tags_remove(col('_Text_clean'))) \
    .withColumn("_Text_clean", regexp_replace("_Text_clean", "\s{2,}", " ")) \
    .withColumn("_Text_clean", trim("_Text_clean"))

posts_clean = posts.withColumn("_Body_clean", regexp_replace("_Body", "\n|\t|\r", " ")) \
    .withColumn("_Body_clean", udf_tags_remove(col('_Body_clean'))) \
    .withColumn("_Body_clean", regexp_replace("_Body_clean", "\s{2,}", " ")) \
    .withColumn("_Body_clean", trim("_Body_clean"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
users_clean.show(1, vertical=True, truncate=50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------------------------------------------------------
 _AboutMe         | <p>Hi, I'm not really a person.</p>\n\n<p>I'm a... 
 _AccountId       | -1                                                 
 _CreationDate    | 2014-01-21 17:45:53.587                            
 _DisplayName     | Community                                          
 _DownVotes       | 503                                                
 _Id              | -1                                                 
 _LastAccessDate  | 2014-01-21 17:45:53.587                            
 _Location        | on the server farm                                 
 _ProfileImageUrl | null                                               
 _Reputation      | 1                                                  
 _UpVotes         | 2                                                  
 _Views           | 5                                                  
 _WebsiteUrl      | http://meta.stackexchange.com/              

In [20]:
history_clean.show(1, vertical=True, truncate=50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------------------------------
 _Comment           | null                                               
 _ContentLicense    | CC BY-SA 3.0                                       
 _CreationDate      | 2014-01-21 20:26:05.383                            
 _Id                | 1                                                  
 _PostHistoryTypeId | 2                                                  
 _PostId            | 1                                                  
 _RevisionGUID      | a17002a0-00b0-417b-a404-0d8864bbbca5               
 _Text              | I was offered a beer the other day that was rep... 
 _UserDisplayName   | null                                               
 _UserId            | 7                                                  
 _Text_clean        | I was offered a beer the other day that was rep... 
only showing top 1 row

In [21]:
posts_clean.show(1, vertical=True, truncate=50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------------------------------------------------------------
 _AcceptedAnswerId      | 4                                                  
 _AnswerCount           | 1                                                  
 _Body                  | <p>I was offered a beer the other day that was ... 
 _ClosedDate            | null                                               
 _CommentCount          | 0                                                  
 _CommunityOwnedDate    | null                                               
 _ContentLicense        | CC BY-SA 3.0                                       
 _CreationDate          | 2014-01-21 20:26:05.383                            
 _FavoriteCount         | null                                               
 _Id                    | 1                                                  
 _LastActivityDate      | 2014-01-21 22:04:34.977                            
 _LastEditDate          | 2014-01-21 22:04:34.977               

## Zapis danych jako plik w formacie `parquet`

In [22]:
users_clean.select(
    col("_AboutMe").alias("about_me"),
    col("_AboutMe_clean").alias("about_me_clean"),
    col("_CreationDate").alias("creation_date"),
    col("_DisplayName").alias("display_name"),
    col("_DownVotes").alias("down_votes"),
    col("_Id").alias("id"),
    col("_LastAccessDate").alias("last_access_date"),
    col("_Location").alias("location"),
    col("_ProfileImageUrl").alias("profile_image_url"),
    col("_Reputation").alias("reputatio"),
    col("_UpVotes").alias("up_votes"),
    col("_Views").alias("views"),
    col("_WebsiteUrl").alias("website_url")
).write.mode('overwrite').format('parquet').option('path', "s3://preprocessed-data-beer-and-wine/users").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
tags.select(
    col("_Count").alias("count"),
    col("_ExcerptPostId").alias("excerpt_post_id"),
    col("_Id").alias("id"),
    col("_TagName").alias("tag_name"),
    col("_WikiPostId").alias("wiki_post_id"),
).write.mode('overwrite').format('parquet').option('path', "s3://preprocessed-data-beer-and-wine/tags").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
votes.select(
    col("_BountyAmount").alias("bounty_amount"),
    col("_CreationDate").alias("creation_date"),
    col("_Id").alias("id"),
    col("_PostId").alias("post_id"),
    col("_UserId").alias("user_id"),
    col("_VoteTypeId").alias("vote_type_id"),
).write.mode('overwrite').format('parquet').option('path', "s3://preprocessed-data-beer-and-wine/votes").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
posts_clean.select(
    col("_AcceptedAnswerId").alias("accepted_answer_id"),
    col("_AnswerCount").alias("answer_count"),
    col("_Body").alias("body"),
    col("_Body_clean").alias("body_clean"),
    col("_ClosedDate").alias("closed_date"),
    col("_CommentCount").alias("comment_count"),
    col("_CommunityOwnedDate").alias("community_owned_date"),
    col("_ContentLicense").alias("content_licence"),
    col("_CreationDate").alias("creation_date"),
    col("_FavoriteCount").alias("favourite_count"),
    col("_Id").alias("id"),
    col("_LastActivityDate").alias("last_activity_date"),
    col("_LastEditDate").alias("last_edit_date"),
    col("_LastEditorDisplayName").alias("last_editor_display_name"),
    col("_LastEditorUserId").alias("last_editor_user_id"),
    col("_OwnerUserId").alias("owner_user_id"),
    col("_PostTypeId").alias("post_type_id"),
    col("_ParentId").alias("parent_id"),
    col("_Score").alias("score"),
    col("_Tags").alias("tags"),
    col("_Title").alias("title"),
    col("_ViewCount").alias("view_count"),
).write.mode('overwrite').format('parquet').option('path', "s3://preprocessed-data-beer-and-wine/posts").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
links.select(
    col("_CreationDate").alias("creation_date"),
    col("_Id").alias("id"),
    col("_LinkTypeId").alias("link_type_id"),
    col("_PostId").alias("post_id"),
    col("_RelatedPostId").alias("related_post_id"),
).write.mode('overwrite').format('parquet').option('path', "s3://preprocessed-data-beer-and-wine/post_links").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
history_clean.select(
    col("_Comment").alias("comment"),
    col("_ContentLicense").alias("content_license"),
    col("_CreationDate").alias("creation_date"),
    col("_Id").alias("id"),
    col("_PostHistoryTypeId").alias("post_history_type_id"),
    col("_PostId").alias("post_id"),
    col("_RevisionGUID").alias("revision_guid"),
    col("_Text").alias("text"),
    col("_Text_clean").alias("text_clean"),
    col("_UserDisplayName").alias("user_distplay_name"),
    col("_UserId").alias("user_id"),
).write.mode('overwrite').format('parquet').option('path', "s3://preprocessed-data-beer-and-wine/history").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
badges.select(
    col("_Class").alias("class"),
    col("_Date").alias("date"),
    col("_Id").alias("id"),
    col("_Name").alias("name"),
    col("_TagBased").alias("tag_based"),
    col("_UserId").alias("user_id"),
).write.mode('overwrite').format('parquet').option('path', "s3://preprocessed-data-beer-and-wine/badges").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…