# Data Wrangling

Data wrangling:
The task that lies between the data acquisition and exploratory data analysis is what most call as data wrangling. It is the process of formatting,  merging, grouping, concatenating etc. for the purpose of analysing  or making it ready for the modelling purpose

## Merging

In [1]:
import pandas as pd
import numpy as np
#Create data frame
left = pd.DataFrame({
         'id':[1,2,3,4,5],
         'Name': ['Alex', 'Amy', 'Allen', 'Alice', 'Ayoung'],
         'subject_id':['sub1','sub2','sub4','sub6','sub5']})
right = pd.DataFrame(
         {'id':[1,2,3,4,5],
         'Name': ['Billy', 'Brian', 'Bran', 'Bryce', 'Betty'],
         'subject_id':['sub2','sub4','sub3','sub6','sub5']})

In [2]:
left.head()

Unnamed: 0,id,Name,subject_id
0,1,Alex,sub1
1,2,Amy,sub2
2,3,Allen,sub4
3,4,Alice,sub6
4,5,Ayoung,sub5


In [3]:
right.head()

Unnamed: 0,id,Name,subject_id
0,1,Billy,sub2
1,2,Brian,sub4
2,3,Bran,sub3
3,4,Bryce,sub6
4,5,Betty,sub5


In [4]:
# merge based on one key
merged_id = pd.merge(left, right, how='inner', on='id', sort=True)
merged_id.head()

Unnamed: 0,id,Name_x,subject_id_x,Name_y,subject_id_y
0,1,Alex,sub1,Billy,sub2
1,2,Amy,sub2,Brian,sub4
2,3,Allen,sub4,Bran,sub3
3,4,Alice,sub6,Bryce,sub6
4,5,Ayoung,sub5,Betty,sub5


In [5]:
# merge based on multiple keys
merged_multiple = pd.merge(left,right,on=['id','subject_id'])
merged_multiple.head()

Unnamed: 0,id,Name_x,subject_id,Name_y
0,4,Alice,sub6,Bryce
1,5,Ayoung,sub5,Betty


In [6]:
# left join
merge_left = pd.merge(left, right, on='subject_id', how='left')
merge_left.head()

Unnamed: 0,id_x,Name_x,subject_id,id_y,Name_y
0,1,Alex,sub1,,
1,2,Amy,sub2,1.0,Billy
2,3,Allen,sub4,2.0,Brian
3,4,Alice,sub6,4.0,Bryce
4,5,Ayoung,sub5,5.0,Betty


In [7]:
# right join
merge_right = pd.merge(left, right, on='subject_id', how='right')
merge_right.head()

Unnamed: 0,id_x,Name_x,subject_id,id_y,Name_y
0,2.0,Amy,sub2,1,Billy
1,3.0,Allen,sub4,2,Brian
2,4.0,Alice,sub6,4,Bryce
3,5.0,Ayoung,sub5,5,Betty
4,,,sub3,3,Bran


In [8]:
# outer join
merge_outer = pd.merge(left, right, how='outer', on='subject_id')
merge_outer.head()

Unnamed: 0,id_x,Name_x,subject_id,id_y,Name_y
0,1.0,Alex,sub1,,
1,2.0,Amy,sub2,1.0,Billy
2,3.0,Allen,sub4,2.0,Brian
3,4.0,Alice,sub6,4.0,Bryce
4,5.0,Ayoung,sub5,5.0,Betty


In [9]:
# inner join
merge_inner = pd.merge(left, right, how='inner', on='subject_id')
merge_inner.head()

Unnamed: 0,id_x,Name_x,subject_id,id_y,Name_y
0,2,Amy,sub2,1,Billy
1,3,Allen,sub4,2,Brian
2,4,Alice,sub6,4,Bryce
3,5,Ayoung,sub5,5,Betty


In [10]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://data36.com/wp-content/uploads/2018/08/4-pandas-merge-inner-outer-left-right-1024x771.png", width=600, height=400)

## Grouping Data

Grouping data sets is a frequent need in data analysis where we need the result in terms of various groups present in the data set

In [11]:
ipl_data = {'Team': ['Riders', 'Riders', 'Devils', 'Devils', 'Kings',
         'kings', 'Kings', 'Kings', 'Riders', 'Royals', 'Royals', 'Riders'],
         'Rank': [1, 2, 2, 3, 3,4 ,1 ,1,2 , 4,1,2],
         'Year': [2014,2015,2014,2015,2014,2015,2016,2017,2016,2014,2015,2017],
         'Points':[876,789,863,673,741,812,756,788,694,701,804,690]}
df = pd.DataFrame(ipl_data)
df

Unnamed: 0,Team,Rank,Year,Points
0,Riders,1,2014,876
1,Riders,2,2015,789
2,Devils,2,2014,863
3,Devils,3,2015,673
4,Kings,3,2014,741
5,kings,4,2015,812
6,Kings,1,2016,756
7,Kings,1,2017,788
8,Riders,2,2016,694
9,Royals,4,2014,701


In [12]:
# group by one column
grouped_1 = df.groupby('Year')
for name,group in grouped_1:
    print(name)
    print(group)

2014
     Team  Rank  Year  Points
0  Riders     1  2014     876
2  Devils     2  2014     863
4   Kings     3  2014     741
9  Royals     4  2014     701
2015
      Team  Rank  Year  Points
1   Riders     2  2015     789
3   Devils     3  2015     673
5    kings     4  2015     812
10  Royals     1  2015     804
2016
     Team  Rank  Year  Points
6   Kings     1  2016     756
8  Riders     2  2016     694
2017
      Team  Rank  Year  Points
7    Kings     1  2017     788
11  Riders     2  2017     690


In [13]:
# group by one column
grouped_2 = df.groupby(['Team','Year'])
grouped_2.apply(lambda a: a[:])

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Team,Rank,Year,Points
Team,Year,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
Devils,2014,2,Devils,2,2014,863
Devils,2015,3,Devils,3,2015,673
Kings,2014,4,Kings,3,2014,741
Kings,2016,6,Kings,1,2016,756
Kings,2017,7,Kings,1,2017,788
Riders,2014,0,Riders,1,2014,876
Riders,2015,1,Riders,2,2015,789
Riders,2016,8,Riders,2,2016,694
Riders,2017,11,Riders,2,2017,690
Royals,2014,9,Royals,4,2014,701


In [14]:
# Aggregations
grouped_a = df.groupby('Team')
print(grouped_a['Points'].agg(np.mean))

Team
Devils    768.000000
Kings     761.666667
Riders    762.250000
Royals    752.500000
kings     812.000000
Name: Points, dtype: float64


In [15]:
# multiple Aggregations

print(grouped_a['Points'].agg([np.sum, np.mean, np.std]))

         sum        mean         std
Team                                
Devils  1536  768.000000  134.350288
Kings   2285  761.666667   24.006943
Riders  3049  762.250000   88.567771
Royals  1505  752.500000   72.831998
kings    812  812.000000         NaN


In [16]:
# Transformation
score = lambda x: (x - x.mean()) / x.std()*3
print(grouped_a.transform(score))

        Rank      Year    Points
0  -4.500000 -3.485685  3.852982
1   1.500000 -1.161895  0.906086
2  -2.121320 -2.121320  2.121320
3   2.121320  2.121320 -2.121320
4   3.464102 -3.273268 -2.582586
5        NaN       NaN       NaN
6  -1.732051  0.654654 -0.708128
7  -1.732051  2.618615  3.290715
8   1.500000  1.161895 -2.311789
9   2.121320 -2.121320 -2.121320
10 -2.121320  2.121320  2.121320
11  1.500000  3.485685 -2.447278


In [17]:
# Filtration
print(df.groupby('Team').filter(lambda x: len(x) >= 4))

      Team  Rank  Year  Points
0   Riders     1  2014     876
1   Riders     2  2015     789
8   Riders     2  2016     694
11  Riders     2  2017     690


## Concatenating Data

In [18]:
one = pd.DataFrame({
         'Name': ['Alex', 'Amy', 'Allen', 'Alice', 'Ayoung'],
         'subject_id':['sub1','sub2','sub4','sub6','sub5'],
         'Marks_scored':[98,90,87,69,78]},
         index=[1,2,3,4,5])
two = pd.DataFrame({
         'Name': ['Billy', 'Brian', 'Bran', 'Bryce', 'Betty'],
         'subject_id':['sub2','sub4','sub3','sub6','sub5'],
         'Marks_scored':[89,80,79,97,88]},
         index=[1,2,3,4,5])

In [19]:
one

Unnamed: 0,Name,subject_id,Marks_scored
1,Alex,sub1,98
2,Amy,sub2,90
3,Allen,sub4,87
4,Alice,sub6,69
5,Ayoung,sub5,78


In [20]:
two

Unnamed: 0,Name,subject_id,Marks_scored
1,Billy,sub2,89
2,Brian,sub4,80
3,Bran,sub3,79
4,Bryce,sub6,97
5,Betty,sub5,88


In [21]:
print(pd.concat([one,two]))

     Name subject_id  Marks_scored
1    Alex       sub1            98
2     Amy       sub2            90
3   Allen       sub4            87
4   Alice       sub6            69
5  Ayoung       sub5            78
1   Billy       sub2            89
2   Brian       sub4            80
3    Bran       sub3            79
4   Bryce       sub6            97
5   Betty       sub5            88


In [22]:
#concat along axis=1, columns
print(pd.concat([one,two],axis=1))

     Name subject_id  Marks_scored   Name subject_id  Marks_scored
1    Alex       sub1            98  Billy       sub2            89
2     Amy       sub2            90  Brian       sub4            80
3   Allen       sub4            87   Bran       sub3            79
4   Alice       sub6            69  Bryce       sub6            97
5  Ayoung       sub5            78  Betty       sub5            88


In [23]:
# append
print(one.append(two))

     Name subject_id  Marks_scored
1    Alex       sub1            98
2     Amy       sub2            90
3   Allen       sub4            87
4   Alice       sub6            69
5  Ayoung       sub5            78
1   Billy       sub2            89
2   Brian       sub4            80
3    Bran       sub3            79
4   Bryce       sub6            97
5   Betty       sub5            88


Reference:-
- [Tutorial Point - Merging](https://www.tutorialspoint.com/python_pandas/python_pandas_merging_joining.htm)
- [Tutorial Point - Grouping](https://www.tutorialspoint.com/python_pandas/python_pandas_groupby.htm)
- [Tutorial Point - concatenation](https://www.tutorialspoint.com/python_pandas/python_pandas_concatenation.htm)

# PySpark

Data sets used for this demonstration is log datasets from NASA Kennedy Space Center web server in Florida
- [Part-1](ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz)
- [Part-2](ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz)

Make sure both the files are in the same directory as this notebook.

In [24]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
    
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

In [25]:
spark

In [26]:
sqlContext

<pyspark.sql.context.SQLContext at 0x17354f8f1c8>

In [27]:
import re

- [Regex](https://www.w3schools.com/python/python_regex.asp)
- [FindIter](https://www.tutorialspoint.com/How-do-we-use-re-finditer-method-in-Python-regular-expression)

In [28]:
# python example of how regular expressions can be used
s1 = 'Im searching for a spark in Pyspark'
pattern = 'spark'
for match in re.finditer(pattern, s1):
    s = match.start()
    e = match.end()
    print(match, 'String match "%s" at %d:%d' % (s1[s:e], s, e))

<re.Match object; span=(19, 24), match='spark'> String match "spark" at 19:24
<re.Match object; span=(30, 35), match='spark'> String match "spark" at 30:35


In [29]:
# Load and View the Dataset using `sqlContext.read.text()` or `spark.read.text()`
import glob

raw_data_files = glob.glob('*.gz')
raw_data_files

['NASA_access_log_Aug95.gz', 'NASA_access_log_Jul95.gz']

In [30]:
base_df = spark.read.text(raw_data_files)
base_df.printSchema()

root
 |-- value: string (nullable = true)



In [31]:
type(base_df)

pyspark.sql.dataframe.DataFrame

Converting data frame to RDD - Just to show the original data structure of spark

[What is RDD?](https://databricks.com/glossary/what-is-rdd#:~:text=RDD%20was%20the%20primary%20user,that%20offers%20transformations%20and%20actions.)

In [32]:
base_df_rdd = base_df.rdd
type(base_df_rdd)

pyspark.rdd.RDD

In [33]:
#view data
base_df.show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
|burger.letters.com - - 

In [34]:
base_df_rdd.take(10)

[Row(value='199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245'),
 Row(value='unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985'),
 Row(value='199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085'),
 Row(value='burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0'),
 Row(value='199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179'),
 Row(value='burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0'),
 Row(value='burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0'),
 Row(value='205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985'),
 Row(value='d104.aa.net - - [01/Ju

Above log data [which is similar to web server logs](https://www.w3.org/Daemon/User/Config/Logging.html#common-logfile-format) is clearly semi structured and needs some processing and wrangling to make it useful for modelling purpose

Let's clean and parse our log dataset to extract structured attributes with meaningful information from each log message.

Data has to be parsed into individual columns. Special built-in [regexp\_extract()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.regexp_extract)
function can do the parsing. This function matches a column against a regular expression with one or more [capture groups](http://regexone.com/lesson/capturing_groups) and allows extraction of the matched groups. One regular expression is used for each field to extract

In [35]:
# look at our dataset dimensions
#print((base_df.count(), len(base_df.columns)))
print("(3461613, 1)")

(3461613, 1)


In [36]:
sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245',
 'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085',
 'burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0',
 '199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179',
 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0',
 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0',
 '205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985',
 'd104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '129.94.144.152 - - [01/Jul/

In [37]:
# Extracting hostnames
# Regular expressions to extract the hostname from the logs:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = [re.search(host_pattern, item).group(1)
           if re.search(host_pattern, item)
           else 'no match'
           for item in sample_logs]
hosts

['199.72.81.55',
 'unicomp6.unicomp.net',
 '199.120.110.21',
 'burger.letters.com',
 '199.120.110.21',
 'burger.letters.com',
 'burger.letters.com',
 '205.212.115.106',
 'd104.aa.net',
 '129.94.144.152',
 'unicomp6.unicomp.net',
 'unicomp6.unicomp.net',
 'unicomp6.unicomp.net',
 'd104.aa.net',
 'd104.aa.net']

In [38]:
# Extracting timestamps
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
timestamps = [re.search(ts_pattern, item).group(1) for item in sample_logs]
timestamps

['01/Jul/1995:00:00:01 -0400',
 '01/Jul/1995:00:00:06 -0400',
 '01/Jul/1995:00:00:09 -0400',
 '01/Jul/1995:00:00:11 -0400',
 '01/Jul/1995:00:00:11 -0400',
 '01/Jul/1995:00:00:12 -0400',
 '01/Jul/1995:00:00:12 -0400',
 '01/Jul/1995:00:00:12 -0400',
 '01/Jul/1995:00:00:13 -0400',
 '01/Jul/1995:00:00:13 -0400',
 '01/Jul/1995:00:00:14 -0400',
 '01/Jul/1995:00:00:14 -0400',
 '01/Jul/1995:00:00:14 -0400',
 '01/Jul/1995:00:00:15 -0400',
 '01/Jul/1995:00:00:15 -0400']

In [39]:
# Extracting HTTP request method, URIs, and protocol

method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
               if re.search(method_uri_protocol_pattern, item)
               else 'no match'
              for item in sample_logs]
method_uri_protocol

[('GET', '/history/apollo/', 'HTTP/1.0'),
 ('GET', '/shuttle/countdown/', 'HTTP/1.0'),
 ('GET', '/shuttle/missions/sts-73/mission-sts-73.html', 'HTTP/1.0'),
 ('GET', '/shuttle/countdown/liftoff.html', 'HTTP/1.0'),
 ('GET', '/shuttle/missions/sts-73/sts-73-patch-small.gif', 'HTTP/1.0'),
 ('GET', '/images/NASA-logosmall.gif', 'HTTP/1.0'),
 ('GET', '/shuttle/countdown/video/livevideo.gif', 'HTTP/1.0'),
 ('GET', '/shuttle/countdown/countdown.html', 'HTTP/1.0'),
 ('GET', '/shuttle/countdown/', 'HTTP/1.0'),
 ('GET', '/', 'HTTP/1.0'),
 ('GET', '/shuttle/countdown/count.gif', 'HTTP/1.0'),
 ('GET', '/images/NASA-logosmall.gif', 'HTTP/1.0'),
 ('GET', '/images/KSC-logosmall.gif', 'HTTP/1.0'),
 ('GET', '/shuttle/countdown/count.gif', 'HTTP/1.0'),
 ('GET', '/images/NASA-logosmall.gif', 'HTTP/1.0')]

In [40]:
# Extracting HTTP status codes

status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) for item in sample_logs]
print(status)

['200', '200', '200', '304', '200', '304', '200', '200', '200', '200', '200', '200', '200', '200', '200']


In [41]:
# Extracting HTTP response content size

content_size_pattern = r'\s(\d+)$'
content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]
print(content_size)

['6245', '3985', '4085', '0', '4179', '0', '0', '3985', '3985', '7074', '40310', '786', '1204', '40310', '786']


In [42]:
# Putting it all together

# regexp_extract(...) method to build our DataFrame with all of the log attributes neatly extracted in their own separate columns.

from pyspark.sql.functions import regexp_extract

logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df.show(10, truncate=True)


+--------------------+--------------------+------+--------------------+--------+------+------------+
|                host|           timestamp|method|            endpoint|protocol|status|content_size|
+--------------------+--------------------+------+--------------------+--------+------+------------+
|        199.72.81.55|01/Jul/1995:00:00...|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   200|    

## Reference:-

 - To know more and wrangle data via Pyspark refer this [link](https://opensource.com/article/19/5/log-data-apache-spark)