In [1]:
# getting data from the Google Cloud
!gsutil cp gs://yelp-dataset-bucket/* .

Copying gs://yelp-dataset-bucket/data-munging.ipynb...
Copying gs://yelp-dataset-bucket/getting-data.ipynb...                          
Copying gs://yelp-dataset-bucket/map-reduce.ipynb...                            
Copying gs://yelp-dataset-bucket/mapper.py...                                   
/ [4 files][ 26.6 KiB/ 26.6 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying gs://yelp-dataset-bucket/reducer.py...
Copying gs://yelp-dataset-bucket/spark-ml.ipynb...                              
Copying gs://yelp-dataset-bucket/working-with-hdfs.ipynb...                     
Copying gs://yelp-dataset-bucket/yelp_academic_dataset_review.json...           
/ [8 files][  5.9 GiB/  5.9 GiB]   51.7 MiB/s                                  

In [18]:
# you can download data from Kaggle directly using their package
# https://github.com/Kaggle/kaggle-api
!pip install kaggle

Collecting kaggle
  Downloading https://files.pythonhosted.org/packages/99/33/365c0d13f07a2a54744d027fe20b60dacdfdfb33bc04746db6ad0b79340b/kaggle-1.5.10.tar.gz (59kB)
[K    100% |████████████████████████████████| 61kB 6.1MB/s eta 0:00:01
Collecting tqdm (from kaggle)
  Downloading https://files.pythonhosted.org/packages/8a/54/115f0c28a61d56674c3a5e05c46d6c3523ad196e1dcd3e2d8b119026df36/tqdm-4.54.1-py2.py3-none-any.whl (69kB)
[K    100% |████████████████████████████████| 71kB 5.6MB/s eta 0:00:01
[?25hCollecting python-slugify (from kaggle)
  Downloading https://files.pythonhosted.org/packages/9f/42/e336f96a8b6007428df772d0d159b8eee9b2f1811593a4931150660402c0/python-slugify-4.0.1.tar.gz
Collecting text-unidecode>=1.3 (from python-slugify->kaggle)
  Downloading https://files.pythonhosted.org/packages/a6/a5/c0b6468d3824fe3fde30dbb5e1f687b291608f9473681bbf7dabbf5a87d7/text_unidecode-1.3-py2.py3-none-any.whl (78kB)
[K    100% |████████████████████████████████| 81kB 7.0MB/s eta 0:00:01
[

In [49]:
!zip -9 some.zip /home/borisshminke/some.model/*

  adding: home/borisshminke/some.model/metadata/ (stored 0%)
  adding: home/borisshminke/some.model/stages/ (stored 0%)


In [50]:
# if you have a zipped data, you should unzip it before uploading to HDFS
# storing zip-files on Googl Cloud is better
!unzip some.zip

Archive:  some.zip


In [2]:
# uploading your data to HDFS
!hdfs dfs -put yelp_academic_dataset_review.json /user/borisshminke

In [18]:
# reading your data, JSON and CSV are preferred if using Spark
data = (
    spark.read
    .json("/user/borisshminke/yelp_academic_dataset_review.json")
)

In [2]:
# feature engineering
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.regression import LinearRegression

pipeline = Pipeline(stages=[
    Tokenizer(inputCol="text", outputCol="words"),
    HashingTF(inputCol="words", outputCol="term_frequency"),
    IDF(inputCol="term_frequency", outputCol="features"),
    LinearRegression(labelCol="stars")
])

In [5]:
# your param grid, use at least two options
from pyspark.ml.tuning import ParamGridBuilder

param_grid = (
    ParamGridBuilder()
    .addGrid("regParam", [0])
    .build()
)

In [19]:
# use a small fraction of data for debug
# if running on all the data lasts forever, you can create a larger cluster
# or if you run out of credits, don't wory, send an working copy on sample
debug_data = data.sample(0.01).cache()

In [7]:
# you can use cross validation here, or split on train and test manually
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator

models = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=RegressionEvaluator(labelCol="stars")
).fit(debug_data)

In [55]:
# for a classification use a tangible metric
# http://spark.apache.org/docs/2.4.3/api/python/pyspark.ml.html#module-pyspark.ml.evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

MulticlassClassificationEvaluator(metricName="accuracy")

MulticlassClassificationEvaluator_42a29d5ede0a3a45f6b4

In [8]:
# reporting values for training set is not necessary
models.validationMetrics

[2.27781356143004]

In [9]:
# be sure to use the right metric:)
models.getEvaluator().getMetricName()

'rmse'

In [57]:
# the deadline is Jan 8th

In [12]:
# fitting model on all data (without splits)
some_model = pipeline.fit(debug_data)

In [37]:
some_model

PipelineModel_4bcf8c3f854bcd357c1b

In [39]:
# save a train model
some_model.write().overwrite().save("/user/borisshminke/some.model")

In [41]:
# check that the model was saved
!hdfs dfs -ls /user/borisshminke/some.model/stages

Found 4 items
drwxr-xr-x   - root hadoop          0 2020-12-17 14:05 /user/borisshminke/some.model/stages/0_Tokenizer_4e15be7484ded4f2fdff
drwxr-xr-x   - root hadoop          0 2020-12-17 14:05 /user/borisshminke/some.model/stages/1_HashingTF_4a109dbe8d4c890f79a0
drwxr-xr-x   - root hadoop          0 2020-12-17 14:05 /user/borisshminke/some.model/stages/2_IDF_438581dbe3d72dc01450
drwxr-xr-x   - root hadoop          0 2020-12-17 14:05 /user/borisshminke/some.model/stages/3_LinearRegression_4aa58c2169880984a486


In [43]:
# get the model from HDFS
!hdfs dfs -get /user/borisshminke/some.model /home/borisshminke/some.model

In [44]:
# uploading your model to Google Cloud Storage
!gsutil cp -r /home/borisshminke/some.model gs://yelp-dataset-bucket/

Copying file:///home/borisshminke/some.model/stages/2_IDF_438581dbe3d72dc01450/metadata/part-00000 [Content-Type=application/octet-stream]...
Copying file:///home/borisshminke/some.model/stages/2_IDF_438581dbe3d72dc01450/metadata/_SUCCESS [Content-Type=application/octet-stream]...
Copying file:///home/borisshminke/some.model/stages/2_IDF_438581dbe3d72dc01450/data/part-00000-848624a9-7100-4804-8e2c-44ccdd011e34-c000.snappy.parquet [Content-Type=application/octet-stream]...
Copying file:///home/borisshminke/some.model/stages/2_IDF_438581dbe3d72dc01450/data/_SUCCESS [Content-Type=application/octet-stream]...
/ [4 files][248.4 KiB/248.4 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying file:///home/borisshminke/some.model/stages/3_

In [17]:
# you can load the model which you saved previously
from pyspark.ml.pipeline import PipelineModel

some_model = PipelineModel.read().load("/user/borisshminke/some.model")

In [20]:
some_predictions = some_model.transform(debug_data)

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator

RegressionEvaluator(labelCol="stars").evaluate(
    some_predictions
)

2.3824686010483127

In [26]:
debug_data.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id']

In [42]:
print(debug_data.count())
print(debug_data.dropna().count())

79613
79613


In [38]:
# how to do scaling and prepare number columns for feeding a model
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

pipeline = Pipeline(stages=[
    VectorAssembler(
        inputCols=["funny", "useful", "cool"],
        outputCol="pre_features"
    ),
    MinMaxScaler(inputCol="pre_features", outputCol="features")
])

In [40]:
(
    pipeline.fit(debug_data).transform(debug_data)
    .select("funny", "useful", "cool", "features", "pre_features")
).show()

+-----+------+----+--------------------+-------------+
|funny|useful|cool|            features| pre_features|
+-----+------+----+--------------------+-------------+
|    0|     0|   0|       [0.0,0.0,0.0]|    (3,[],[])|
|    1|     6|   1|[0.00584795321637...|[1.0,6.0,1.0]|
|    0|     0|   0|       [0.0,0.0,0.0]|    (3,[],[])|
|    1|     2|   0|[0.00584795321637...|[1.0,2.0,0.0]|
|    0|     1|   0|[0.0,0.0067114093...|[0.0,1.0,0.0]|
|    0|     0|   0|       [0.0,0.0,0.0]|    (3,[],[])|
|    0|     0|   0|       [0.0,0.0,0.0]|    (3,[],[])|
|    0|     0|   0|       [0.0,0.0,0.0]|    (3,[],[])|
|    0|     1|   0|[0.0,0.0067114093...|[0.0,1.0,0.0]|
|    0|     1|   0|[0.0,0.0067114093...|[0.0,1.0,0.0]|
|    0|     0|   1|[0.0,0.0,0.007194...|[0.0,0.0,1.0]|
|    0|     0|   1|[0.0,0.0,0.007194...|[0.0,0.0,1.0]|
|    0|     5|   0|[0.0,0.0335570469...|[0.0,5.0,0.0]|
|    0|     0|   0|       [0.0,0.0,0.0]|    (3,[],[])|
|    1|     1|   5|[0.00584795321637...|[1.0,1.0,5.0]|
|    0|   

In [46]:
debug_data.summary().toPandas()

Unnamed: 0,summary,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,count,79613,79613.0,79613,79613.0,79613,79613.0,79613,79613.0,79613
1,mean,,0.5793651790536721,,0.4679637747604034,,3.706228882217728,,1.3165814628264227,
2,stddev,,2.503847667428883,,2.1866375974104084,,1.489918562112281,,3.4152387486574987,
3,min,--1UhMGODdWsrMastO9DZw,0.0,2005-03-16 17:08:51,0.0,--0pfY3vQilgl20btE0fVQ,1.0,! ! ! BEST MASSAGE THERAPIST IN TOWN ! ! ! \nA...,0.0,---1lKK3aKOuomHnwAkAow
4,25%,,0.0,,0.0,,3.0,,0.0,
5,50%,,0.0,,0.0,,4.0,,0.0,
6,75%,,0.0,,0.0,,5.0,,1.0,
7,max,zzwicjPC9g246MK2M1ZFBA,139.0,2019-12-13 15:22:44,171.0,zzsSYtKmFzbg5as5n4LS_Q,5.0,（忘记照相了，也忘记菜名了...所以盗用了一些大家的图片）点了下面图上这几样小菜和面，味道感...,149.0,zzyrLRly27i2dQdsE4XdPg


In [55]:
debug_data.select("funny").rdd.take(10)

[Row(funny=0),
 Row(funny=1),
 Row(funny=0),
 Row(funny=1),
 Row(funny=0),
 Row(funny=0),
 Row(funny=0),
 Row(funny=0),
 Row(funny=0),
 Row(funny=0)]

In [53]:
# histograms are not ported yet to DataFrames API
# so you need to use RDDs
(
    debug_data.select("funny")
    .rdd.map(lambda row: row[0])
    .histogram(10)
)

([0.0,
  17.1,
  34.2,
  51.300000000000004,
  68.4,
  85.5,
  102.60000000000001,
  119.70000000000002,
  136.8,
  153.9,
  171],
 [79442, 133, 14, 12, 7, 3, 0, 0, 0, 2])

In [71]:
debug_data.select("business_id").distinct().count()

41062

In [74]:
# for categorical variables you can do one-hot encoding
from pyspark.ml.feature import OneHotEncoder, StringIndexer

pipeline = Pipeline(stages=[
    StringIndexer(inputCol="business_id", outputCol="category_id"),
    OneHotEncoder(inputCol="category_id", outputCol="one"),
    VectorAssembler(
        inputCols=["funny", "useful", "cool", "one"],
        outputCol="pre_features"
    ),

])

In [76]:
(
    pipeline.fit(debug_data).transform(debug_data)
    .select("pre_features")
    .limit(10)
).toPandas()

Unnamed: 0,pre_features
0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,"(1.0, 6.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,"(1.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
5,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
6,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
8,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
9,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
