In [1]:
from pyspark.sql.types import Row
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import explode
import json
import re
import pandas as pd



In [2]:
trending = sc.textFile("s3://waltrend/2017/03/*/*/*/")

In [3]:
def getcost(trends):
    
    try:
        
        trend = json.loads(trends)
        trend = trend.get('items')
        if not trend:
            return
        return [item['name'] for item in trend]
        
    except:
        return
    
        
    

In [4]:
trend_items = trending.map(lambda x: getcost(x))
trends = trend_items.map(lambda p: Row(name=p))


In [5]:
schemait = sqlContext.createDataFrame(trends)

In [6]:
schemait1 = trends.toDF()


In [7]:
schemait.printSchema()

root
 |-- name: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [8]:
schemait.show()

+--------------------+
|                name|
+--------------------+
|[Xbox One S 500GB...|
|[Straight Talk Pr...|
|[Nintendo Switch ...|
|[RCA Voyager 7" 1...|
|[RCA Voyager 7" 1...|
|[Titanfall 2 (Xbo...|
|[Titanfall 2 (Xbo...|
+--------------------+



In [9]:
items = schemait.select(explode(schemait['name']))
items.createOrReplaceTempView("demanded")

In [10]:
spark.sql("SELECT col,COUNT(*) AS `num` FROM demanded GROUP BY col ORDER BY num DESC").show(20,False)

+-------------------------------------------------------------------------------+---+
|col                                                                            |num|
+-------------------------------------------------------------------------------+---+
|Microsoft Xbox One Wireless Controller, White                                  |7  |
|Intex Queen 22" DuraBeam High Rise Airbed Mattress with Built-in Pump          |7  |
|Mainstays Albany Lane 6-Piece Folding Dining Set, Multiple Colors              |7  |
|Better Homes and Gardens 8-Cube Organizer, Multiple Colors                     |7  |
|Mainstays Twin Over Twin Wood Bunk Bed, Multiple Finishes                      |7  |
|Tramontina PrimaWare 18-Piece Nonstick Cookware Set                            |7  |
|Google Home                                                                    |6  |
|Spalding NBA 54" Polycarbonate Backboard                                       |6  |
|Baby Relax Sleigh Toddler Bed                        

In [11]:
demandDF = spark.sql("SELECT col,COUNT(*) AS `num` FROM demanded GROUP BY col ORDER BY num DESC")

In [12]:
items_pandas = demandDF.toPandas()

In [13]:
items_json = items_pandas.set_index('col').T.to_dict('int').get('num')

In [14]:
from pymongo import MongoClient
import json

client = MongoClient()
db = client.test

In [15]:
d = {}
for key,value in items_json.items():
    d[key.replace(".","_")] = value


In [16]:
result = db.item.insert_one(d)

In [17]:
result.inserted_id

ObjectId('58c14f8653e7bb305df84434')

In [23]:
cursor = db.item.find().sort()

TypeError: sort() takes at least 2 arguments (1 given)

In [19]:
cursor

<pymongo.cursor.Cursor at 0x7f69e40c5850>

In [20]:
for document in cursor:
    print(document)

{u'Mainstays Basic Student Desk, Multiple Colors': 3, u'Better Homes and Gardens 8-Cube Organizer, Multiple Colors': 7, u'Mainstays Twin Over Twin Wood Bunk Bed, Multiple Finishes': 7, u'Graco 4Ever All-in-1 Convertible Car Seat, Choose Your Pattern': 3, u'Hamilton Beach 0_7-cu_ ft_ Microwave Oven, Black': 1, u'Cosco Scenera NEXT Convertible Car Seat, Choose your Pattern': 2, u'Xbox One S Battlefield 1 Special Edition Bundle, Storm Grey (500GB)': 1, u'Aurora AS680S 6-Sheet Strip-Cut Paper/Credit Card Shredder without Wastebasket': 2, u'Microsoft Xbox One Wireless Controller, White': 7, u'Nintendo Switch Gaming Console with Neon Blue and Neon Red Joy-Con': 1, u'Straight Talk Prepaid Apple iPhone 5S 16GB CDMA Smartphone, Refurbished': 2, u'RCA 7" Tablet 16GB Quad Core': 2, u"Marvel's Doctor Strange (Blu-ray 3D + Blu-ray + DVD + Digital HD) (Widescreen)": 1, u'Spalding NBA 54" Polycarbonate Backboard': 6, u'11x14 Same-Day Photo Canvas': 1, u'iView SupraPad with WiFi 7" Touchscreen Tablet 

## Currency exchange data

In [20]:
currency = spark.read.json("s3://walcurrency/2017/03/*/*/*/")

In [21]:
currency.printSchema()

root
 |-- privacy: string (nullable = true)
 |-- quotes: struct (nullable = true)
 |    |-- USDAED: double (nullable = true)
 |    |-- USDAUD: double (nullable = true)
 |    |-- USDCAD: double (nullable = true)
 |    |-- USDCHF: double (nullable = true)
 |    |-- USDCNY: double (nullable = true)
 |    |-- USDEUR: double (nullable = true)
 |    |-- USDGBP: double (nullable = true)
 |    |-- USDINR: double (nullable = true)
 |    |-- USDJPY: double (nullable = true)
 |    |-- USDZAR: double (nullable = true)
 |-- source: string (nullable = true)
 |-- success: boolean (nullable = true)
 |-- terms: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [22]:
currency_df = currency.select(currency['timestamp'],currency['quotes.USDAED'],currency['quotes.USDAUD'],currency['quotes.USDCAD'],currency['quotes.USDCHF'],currency['quotes.USDCNY'],currency['quotes.USDEUR'],currency['quotes.USDGBP'],currency['quotes.USDINR'],currency['quotes.USDJPY'],currency['quotes.USDZAR'])

In [23]:
currency_pandas = currency_df.toPandas()

In [24]:
currency_pandas

Unnamed: 0,timestamp,USDAED,USDAUD,USDCAD,USDCHF,USDCNY,USDEUR,USDGBP,USDINR,USDJPY,USDZAR
0,2017-Mar-03,3.671984,1.320701,1.33857,1.012799,6.889801,0.951044,0.814301,66.759003,114.318001,13.149882
1,2017-Mar-04,3.672404,1.314904,1.337304,1.007104,6.895304,0.941304,0.81335,66.749001,114.004997,13.009804
2,2017-Mar-05,3.672404,1.314904,1.337304,1.007104,6.895304,0.941304,0.81335,66.749001,114.004997,13.009804
3,2017-Mar-07,3.671994,1.317008,1.34055,1.01244,6.895394,0.945502,0.817398,66.649002,113.991997,13.017701
4,2017-Mar-02,3.672037,1.305102,1.33486,1.00999,6.881797,0.948802,0.81441,66.699997,114.056999,13.016298
5,2017-Mar-08,3.672098,1.316602,1.34079,1.01302,6.899902,0.945899,0.81932,66.613998,113.904999,12.966099
6,2017-Mar-09,3.672296,1.328101,1.34927,1.01503,6.911303,0.948599,0.82197,66.753998,114.540001,13.139903
7,2017-Mar-06,3.671973,1.318804,1.33896,1.0084,6.895302,0.942019,0.81373,66.749001,113.763991,13.041402


## trending items at walmart and their discount rates calculated

In [27]:
trend_items = spark.read.json("s3://waltrend/2017/03/*/*/*/")

In [28]:
trend_items.printSchema()

root
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- addToCartUrl: string (nullable = true)
 |    |    |-- affiliateAddToCartUrl: string (nullable = true)
 |    |    |-- age: string (nullable = true)
 |    |    |-- attributes: struct (nullable = true)
 |    |    |    |-- actualColor: string (nullable = true)
 |    |    |    |-- finish: string (nullable = true)
 |    |    |    |-- pattern: string (nullable = true)
 |    |    |    |-- size: string (nullable = true)
 |    |    |    |-- theme: string (nullable = true)
 |    |    |-- availableOnline: boolean (nullable = true)
 |    |    |-- bestMarketplacePrice: struct (nullable = true)
 |    |    |    |-- availableOnline: boolean (nullable = true)
 |    |    |    |-- clearance: boolean (nullable = true)
 |    |    |    |-- offerType: string (nullable = true)
 |    |    |    |-- price: double (nullable = true)
 |    |    |    |-- sellerInfo: string (nullable = true)
 |    |    |    |-- stan

In [44]:
trend_df = trend_items.select(explode(trend_items['items.name']))

In [45]:
df1 = trend_items.select(explode(trend_items['items.msrp']))

In [46]:
df2 = trend_items.select(explode(trend_items['items.salePrice']))

In [47]:
df3 = trend_items.select(explode(trend_items['items.customerRating']))

In [48]:
cost_df = trend_df.toPandas()

In [49]:
cost_df['msrp'] = df1.toPandas()
cost_df['salePrice'] = df2.toPandas()
cost_df['customerRating'] = df3.toPandas()


In [50]:
cost_df = cost_df.dropna()

In [52]:
cost_df['discount'] = ((cost_df['msrp'] -  cost_df['salePrice'])/cost_df['msrp'])*100
cost_df

Unnamed: 0,col,msrp,salePrice,customerRating,discount
0,"RCA Voyager 7"" 16GB Tablet Android 6.0 (Marshm...",59.99,42.95,3.836,28.404734
2,"Proscan PLDED5069 50"" 1080p 60Hz LED HDTV",259.99,249.98,4.148,3.850148
6,Canon PIXMA MX490 Wireless Office All-in-One P...,59.97,39.00,3.93,34.967484
7,"Haier 5.0 cu ft Freezer, White",169.84,148.00,4.41,12.859162
8,"Microsoft Xbox One Wireless Controller, White",59.96,39.00,4.722,34.956638
9,"Intex Queen 22"" DuraBeam High Rise Airbed Matt...",74.97,54.99,4.353,26.650660
10,Mainstays Albany Lane 6-Piece Folding Dining S...,124.00,87.75,4.351,29.233871
11,Kolcraft Pediatric 800 Crib and Toddler Mattress,39.98,39.46,4.262,1.300650
12,"Graco 4Ever All-in-1 Convertible Car Seat, Cho...",348.40,299.99,4.844,13.894948
13,Call Of Duty Infinite Warfare (PS4),59.96,25.70,3.718,57.138092


## value of the day , most valuable item for the day combining it with the current currency rates and depicting the values

In [29]:
vod = spark.read.json("s3://walvod/2017/03/*/*/*/")

In [30]:
vod.printSchema()

root
 |-- addToCartUrl: string (nullable = true)
 |-- affiliateAddToCartUrl: string (nullable = true)
 |-- age: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- finish: string (nullable = true)
 |-- availableOnline: boolean (nullable = true)
 |-- bestMarketplacePrice: struct (nullable = true)
 |    |-- availableOnline: boolean (nullable = true)
 |    |-- clearance: boolean (nullable = true)
 |    |-- offerType: string (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- sellerInfo: string (nullable = true)
 |    |-- standardShipRate: double (nullable = true)
 |    |-- twoThreeDayShippingRate: double (nullable = true)
 |-- brandName: string (nullable = true)
 |-- bundle: boolean (nullable = true)
 |-- categoryNode: string (nullable = true)
 |-- categoryPath: string (nullable = true)
 |-- clearance: boolean (nullable = true)
 |-- color: string (nullable = true)
 |-- customerRating: string (nullable = true)
 |-- customerRatingImage: string (nul

In [31]:
vod_df = vod.select(vod['timestamp'],vod['name'],vod['salePrice'])

In [32]:
vod_pandas = vod_df.toPandas()

In [33]:
currency_pandas

Unnamed: 0,timestamp,USDAED,USDAUD,USDCAD,USDCHF,USDCNY,USDEUR,USDGBP,USDINR,USDJPY,USDZAR
0,2017-Mar-03,3.671984,1.320701,1.33857,1.012799,6.889801,0.951044,0.814301,66.759003,114.318001,13.149882
1,2017-Mar-04,3.672404,1.314904,1.337304,1.007104,6.895304,0.941304,0.81335,66.749001,114.004997,13.009804
2,2017-Mar-05,3.672404,1.314904,1.337304,1.007104,6.895304,0.941304,0.81335,66.749001,114.004997,13.009804
3,2017-Mar-07,3.671994,1.317008,1.34055,1.01244,6.895394,0.945502,0.817398,66.649002,113.991997,13.017701
4,2017-Mar-02,3.672037,1.305102,1.33486,1.00999,6.881797,0.948802,0.81441,66.699997,114.056999,13.016298
5,2017-Mar-08,3.672098,1.316602,1.34079,1.01302,6.899902,0.945899,0.81932,66.613998,113.904999,12.966099
6,2017-Mar-09,3.672296,1.328101,1.34927,1.01503,6.911303,0.948599,0.82197,66.753998,114.540001,13.139903
7,2017-Mar-06,3.671973,1.318804,1.33896,1.0084,6.895302,0.942019,0.81373,66.749001,113.763991,13.041402


In [67]:
vod_pandas

Unnamed: 0,timestamp,name,salePrice
0,2017-Mar-05,"Bostitch 18-Piece Tic Tac Box, BSA218DGM",7.2
1,2017-Mar-09,Knape and Vogt 6' White Closet Pole,9.76
2,2017-Mar-04,"Stanley Extendable Runner, Corner, 24"" x 24""",28.58
3,2017-Mar-06,Dremel 190 High Speed Cutter,3.34
4,2017-Mar-08,Vanity Top 2-Arm Guest Towel Holder (Build to ...,148.25
5,2017-Mar-03,"Bostitch 7"" Turbo (Diamond Blade), BSA4712M",9.99
6,2017-Mar-07,"RotoZip 1/8"" GP8 Guidepoint Drywall ZipBit, 8pk",11.18
7,2017-Mar-02,"Bostitch 12"" 6 Tpi Recip Blade, 3-Pack, BSA4804M",6.38


In [36]:
item_rates = currency_pandas.set_index('timestamp').join(vod_pandas.set_index('timestamp'))

In [37]:
item_rates

Unnamed: 0_level_0,USDAED,USDAUD,USDCAD,USDCHF,USDCNY,USDEUR,USDGBP,USDINR,USDJPY,USDZAR,name,salePrice
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2017-Mar-03,3.671984,1.320701,1.33857,1.012799,6.889801,0.951044,0.814301,66.759003,114.318001,13.149882,"Bostitch 7"" Turbo (Diamond Blade), BSA4712M",9.99
2017-Mar-04,3.672404,1.314904,1.337304,1.007104,6.895304,0.941304,0.81335,66.749001,114.004997,13.009804,"Stanley Extendable Runner, Corner, 24"" x 24""",28.58
2017-Mar-05,3.672404,1.314904,1.337304,1.007104,6.895304,0.941304,0.81335,66.749001,114.004997,13.009804,"Bostitch 18-Piece Tic Tac Box, BSA218DGM",7.2
2017-Mar-07,3.671994,1.317008,1.34055,1.01244,6.895394,0.945502,0.817398,66.649002,113.991997,13.017701,"RotoZip 1/8"" GP8 Guidepoint Drywall ZipBit, 8pk",11.18
2017-Mar-02,3.672037,1.305102,1.33486,1.00999,6.881797,0.948802,0.81441,66.699997,114.056999,13.016298,"Bostitch 12"" 6 Tpi Recip Blade, 3-Pack, BSA4804M",6.38
2017-Mar-08,3.672098,1.316602,1.34079,1.01302,6.899902,0.945899,0.81932,66.613998,113.904999,12.966099,Vanity Top 2-Arm Guest Towel Holder (Build to ...,148.25
2017-Mar-09,3.672296,1.328101,1.34927,1.01503,6.911303,0.948599,0.82197,66.753998,114.540001,13.139903,Knape and Vogt 6' White Closet Pole,9.76
2017-Mar-06,3.671973,1.318804,1.33896,1.0084,6.895302,0.942019,0.81373,66.749001,113.763991,13.041402,Dremel 190 High Speed Cutter,3.34


In [38]:
item_rates_final = pd.DataFrame(item_rates['name'])
item_rates_final['value_INR'] =  item_rates['USDINR'] * item_rates['salePrice']
item_rates_final['value_EUR'] =  item_rates['USDEUR'] * item_rates['salePrice']
item_rates_final['value_GBP'] =  item_rates['USDGBP'] * item_rates['salePrice']
item_rates_final['value_JPY'] =  item_rates['USDJPY'] * item_rates['salePrice']
item_rates_final['value_USD'] =  item_rates['salePrice'] 
item_rates_final

Unnamed: 0_level_0,name,value_INR,value_EUR,value_GBP,value_JPY,value_USD
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2017-Mar-03,"Bostitch 7"" Turbo (Diamond Blade), BSA4712M",666.92244,9.50093,8.134867,1142.03683,9.99
2017-Mar-04,"Stanley Extendable Runner, Corner, 24"" x 24""",1907.686449,26.902468,23.245543,3258.262814,28.58
2017-Mar-05,"Bostitch 18-Piece Tic Tac Box, BSA218DGM",480.592807,6.777389,5.85612,820.835978,7.2
2017-Mar-07,"RotoZip 1/8"" GP8 Guidepoint Drywall ZipBit, 8pk",745.135842,10.570712,9.13851,1274.430526,11.18
2017-Mar-02,"Bostitch 12"" 6 Tpi Recip Blade, 3-Pack, BSA4804M",425.545981,6.053357,5.195936,727.683654,6.38
2017-Mar-08,Vanity Top 2-Arm Guest Towel Holder (Build to ...,9875.525203,140.229527,121.46419,16886.416102,148.25
2017-Mar-09,Knape and Vogt 6' White Closet Pole,651.51902,9.258326,8.022427,1117.91041,9.76
2017-Mar-06,Dremel 190 High Speed Cutter,222.941663,3.146343,2.717858,379.97173,3.34
