-
Notifications
You must be signed in to change notification settings - Fork 6
/
data-transformation.py
342 lines (256 loc) · 12.2 KB
/
data-transformation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import *
from pyspark.sql import Window, Row
from pyspark.sql.functions import *
import sys
import glob
sc = SparkContext()
sqlContext = SQLContext(sc)
def load_netflix_data(url, directory):
'''
A function that downloads Netflix Prize dataset from Kaggle, saves and extracts in to the 'directory'.
Parameters
----------
url : str
URL to download Netflix data
directory : str
directory where downloaded data will be saved and extracted
'''
r = requests.get(url, allow_redirects=True)
open(directory+'files.tar', 'wb').write(r.content)
my_tar = tarfile.open('files.tar')
my_tar.extractall()
my_tar.close()
def relationize_netflix_data(directory):
'''
A function used to relationize Netflix data using Pyspark. An expensive row enumeration
step is necessary to be able to use a window function.
Parameters
----------
directory : str
a working directory where raw Netflix data resides
Returns
-------
user_ratings : DataFrame
Pyspark dataframe with Netflix user ratings data with following columns: "Movie ID", "User ID", "Rating", "Date"
'''
# Load data
rdd = sc.textFile(directory + 'combined*.txt')
# Relationalize and create schema
rdd_structured = rdd.map(lambda x: Row(movieID=int(x.split(':')[0]), rating=None) if ':' in x else
Row(movieID=None, rating=int(x.split(',')[1])))
# Load to DataFrame
user_ratings_schema = StructType([StructField("movieID", IntegerType(), True),
StructField("rating", IntegerType(), True)])
user_ratings = sqlContext.createDataFrame(rdd_structured, schema=user_ratings_schema)
# Enumerate rows
user_ratings = user_ratings.withColumn("index", monotonically_increasing_id())
# Fill missing Movie ID with the last value seen
window = Window.orderBy("index").rowsBetween(-sys.maxsize, 0)
fill_with = last(user_ratings['movieID'], True).over(window)
user_ratings = user_ratings.withColumn('movieID', fill_with)
# Drop unnecessary rows/columns
user_ratings = user_ratings.drop('index') \
.na.drop()
return user_ratings
def transform_netflix_data(directory):
'''
A function used to relationize Netflix user ratings dataset and join it with with movies metadata.
It drops irrelevant columns and create a new column "company" populated with "netflix".
Parameters
----------
directory : str
a working directory where raw Netflix data resides
Returns
-------
netflix_data : DataFrame
Pyspark dataframe with Netflix data with following columns: "rating", "year", "title", "company"
'''
# Relationize Netflix data
user_ratings = transform_netflix_data(directory)
movie_titles_schema = StructType([StructField("Movie ID", IntegerType(), True),
StructField("Year", IntegerType(), True),
StructField("Title", StringType(), True)])
# Load movies data to DataFrame
movie_titles_schema = StructType([StructField("movieID", IntegerType(), True),
StructField("year", IntegerType(), True),
StructField("title", StringType(), True)])
movie_titles = sqlContext.read.csv(directory + 'movie_titles.csv', header=False, schema=movie_titles_schema)
# Join DataFrames
netflix_data = user_ratings.join(movie_titles, "movieID", 'left')
# Drop irrelevant columns
netflix_data = netflix_data.drop('movieID')
# Add a column indicating the company
netflix_data = netflix_data.withColumn("company", lit("netflix"))
return netflix_data
def load_amazon_data(s3_input_path,
files = ["amazon_reviews_us_Video_v1_00.tsv.gz",
"amazon_reviews_us_Video_DVD_v1_00.tsv.gz",
"amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz"],
directory):
'''
A function that downloads Amazon dataset from S3, saves it and extracts to the 'directory'.
Parameters
----------
s3_input_path : str
S3 path with Amazon Reviews data
files : list of str
list of files to download
directory : str
directory where downloaded data will be saved and extracted
Returns
-------
amazon_data : DataFrame
Pyspark dataframe with original Amazon data
'''
for i, file in enumerate(files):
# Download
url = s3_input_path + file
r = requests.get(url, allow_redirects=True)
open('file.tsv.gz', 'wb').write(r.content)
# Unzip
f = gzip.open('file.tsv.gz', 'rb')
file_content = f.read()
f.close()
# Load to DF
part_i = sqlContext.read.csv(file_content, sep='\t', header=True)
# Join data
amazon_data = part_0.union(part_1).union(part_2)
return amazon_data
def transform_amazon_data(amazon_data):
'''
A function used to extract year of movie release from movie title, drop irrelevant
columns and create a new column "company" populated with "amazon".
Parameters
----------
amazon_data : DataFrame
Pyspark dataframe with original Amazon data
Returns
-------
amazon_data : DataFrame
Pyspark dataframe with Netflix data with following columns: "rating", "year", "title", "company"
'''
amazon_data = amazon_data.withColumn("company", lit("amazon"))
# Parse year
expr = r"(\([1-2][0-9]+\))"
year_found = regexp_extract(amazon_data["product_title"], expr, 0)
amazon_data = amazon_data.withColumn("year", year_found[2:4].cast(IntegerType()))
# Filter columns relevant to the problem
amazon_data = amazon_data.select(col("star_rating").alias("rating").cast(IntegerType()),
col('year'),
col("product_title").alias("title"),
col('company'))
return amazon_data
def join_transform(amazon_data, netflix_data):
'''
A function used to join Amazon and Netflix data and perform series of transforms in
order to be able to compare better the same movies with differently formulated titles.
- It removes the information about the movie/series part from the title, standardizes it and
appends to the end of the title.
- It converts title to lowercasem removes double spaces, trailing/leading spaces, removes punctuation
- It propagates movie/series release year for records that lack this information
Parameters
----------
amazon_data : DataFrame
Pyspark dataframe with Netflix data with following columns: "rating", "year", "title", "company"
netflix_data : DataFrame
Pyspark dataframe with Netflix data with following columns: "rating", "year", "title", "company"
Returns
-------
data : DataFrame
Pyspark dataframe with final Netflix and Amazon movie data with following columns: "rating", "year", "title", "company"
'''
# Join datasets
data = amazon_data.union(netflix_data)
data = data.repartition(100)
# Title to lowercase
data = data.withColumn("cleanTitle", lower(col("title")))
# Extract season/series/part number
number = ('([1-9]([0-9]+)?|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve|'
'thirteen|fourteen|fifteen|sixteen|seventeen|eighteen|nineteen|twenty|'
'first|second|third|fourth|fifth|sixth|seventh|eighth|nineth|tenth|'
'eleventh|twelfth|thirteenth|fourteenth|fifteenth|sixteenth|seventeenth|'
'eighteenth|nineteenth|twentieth)')
char_words = '(season|part|series|vol(\w+)?)'
after = '(?<=' + char_words + ' +)' + number
before = number + '(?= +' + char_words + ')'
# If exists, extract the number after the characteristic word, otherwise the number before
number_after = regexp_extract(data["cleanTitle"], after, 0)
condition = number_after != ''
number_before = regexp_extract(data["cleanTitle"], before, 0)
data = data.withColumn("part", when(condition, number_after).otherwise(number_before))
# # Remove season/part/series info
remove_num_after = regexp_replace(data["cleanTitle"], char_words + ' ?' + after, '')
remove_num_before = regexp_replace(data["cleanTitle"], before + ' ' + char_words , '')
data = data.withColumn("cleanTitle", when(condition, remove_num_after).otherwise(remove_num_before))
# Remove text starting from parenthesis
no_parenthesis = regexp_extract(data["cleanTitle"], r'(^[^(\(|\[)]+)', 0)
data = data.withColumn("cleanTitle", no_parenthesis)
# Uniformize movie/serie part indicator
data.cache()
data = data.withColumn('part', when((col('part') == 'one') | (col('part') =='first'), '1') \
.when((col('part') == 'two') | (col('part') == 'second'), '2') \
.when((col('part') == 'three') | (col('part') == 'third'), '3') \
.when((col('part') == 'four') | (col('part') == 'fourth'), '4') \
.when((col('part') == 'five') | (col('part') == 'fifth'), '5') \
.when((col('part') == 'six') | (col('part') == 'sixth'), '6') \
.when((col('part') == 'seven') | (col('part') == 'seventh'), '7') \
.when((col('part') == 'eight') | (col('part') == 'eighth'), '8') \
.when((col('part') == 'nine') | (col('part') == 'nineth'), '9') \
.when((col('part') == 'ten') | (col('part') == 'tenth'), '10') \
.when((col('part') == 'eleven') | (col('part') == 'eleventh'), '11') \
.when((col('part') == 'twelve') | (col('part') == 'twelfth'), '12') \
.when((col('part') == 'thirteen') | (col('part') == 'thirteenth'), '13') \
.when((col('part') == 'fourteen') | (col('part') == 'fourteenth'), '14') \
.when((col('part') == 'fifteen') | (col('part') == 'fifteenth'), '15') \
.when((col('part') == 'sixteen') | (col('part') == 'sixteenth'), '16') \
.when((col('part') == 'seventeen') | (col('part') == 'seventeenth'), '17') \
.when((col('part') == 'eighteen') | (col('part') == 'eighteenth'), '18') \
.when((col('part') == 'nineteen') | (col('part') == 'nineteenth'), '19') \
.when((col('part') == 'twenty') | (col('part') == 'twentieth'), '20') \
.otherwise(col('part')))
# Add info about series/movie part to the end of the string
concat = concat(col('cleanTitle'), lit(' '), col('part'))
data = data.withColumn('cleanTitle', concat)
# Remove punctuation
no_punctuation = regexp_replace(data["cleanTitle"], r'([^\s\w_]|_)+', '')
data = data.withColumn("cleanTitle", no_punctuation)
# Remove double spaces
no_double_spaces = regexp_replace(data["cleanTitle"], r' +', ' ')
data = data.withColumn("cleanTitle", no_double_spaces)
# Trim leading and trailing spaces
data = data.withColumn("cleanTitle", trim(col("cleanTitle")))
# Propagate title release
window = Window.partitionBy("cleanTitle").orderBy(col("year").desc()).rowsBetween(Window.unboundedPreceding, 0)
fill_year = last(col("year"), True).over(window)
data = data.withColumn("year", fill_year)
data = data.select(col("company"), col('cleanTitle').alias('title'), col("year"), col("rating"))
data = data.repartition(10)
data.write.format('parquet') \
.option('header', 'true') \
.mode('overwrite') \
.save(s3_output_path + 'final_data.parquet')
return data
def main():
# Set up directory
directory = '/some_directory'
# Load and transform Netflix data
load_netflix_data(url='https://www.kaggle.com/netflix-inc/netflix-prize-data/download', directory)
netflix_data = transform_netflix_data(directory)
# Load and transform Amazon data
amazon_data = load_amazon_data(s3_input_path='s3://amazon-reviews-pds/tsv/',
files = ["amazon_reviews_us_Video_v1_00.tsv.gz",
"amazon_reviews_us_Video_DVD_v1_00.tsv.gz",
"amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz"],
directory)
amazon_data = transform_amazon_data(amazon_data)
# Join both datasets, transform and save final dataset to Parquet file
data = join_transform(amazon_data, netflix_data)
# Save final dataset
data = data.repartition(10)
data.write.format('parquet') \
.option('header', 'true') \
.mode('overwrite') \
.save('final_data.parquet')
if __name__ == "__main__":
main()