<a href="https://colab.research.google.com/github/zalihat/wals-engine/blob/master/data_processing_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing the necessary libraries
Run this cell below only once

---



In [154]:
# installing apache beam and its dependencies. After installions are done, restart your runtime - godwin
!pip install apache-beam
!pip install apache-beam[gcp]
!pip install gcsfs
!pip install apache-beam[interactive]



# Creating the pipeline

In [155]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import pandas as pd

beam.__version__


'2.24.0'

# Configuring pipeline options
Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner. Your pipeline options will potentially include information such as your project ID or a location for storing files

 ## Setting PipelineOptions from command-line arguments

In [156]:
# added by godwin
class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='https://storage.googleapis.com/coterie-rec/ratings_Movies_and_TV.csv')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://coterie-rec')

# To-do


1. Load the data as csv
2. cleaning the data - Nameing the columns, converting the dtypes, setting the headers to none, checking for null values
3. Check the unique values frot the users and items
3. Create the Matrix
5. Create the Sparse matrix

# Reading from an external source

In [157]:
input_file = 'gs://coterie-rec/ml-100k/u.data' 

In [158]:
# delete timestamp colum - Godwin
def del_col(data):
    del data['timestamp']
    return data
  
# check for null values - Godwin
def check_null(data):
    return len(data['userid']) > 0 and len(data['itemid']) > 0 and len(data['ratings']) > 0

# converting to comma deliminated format
def format_data(data):
  data = ','.join([data['userid'], data['itemid'], data['ratings']])
  return data


# The train,  test data

In [159]:
from scipy.sparse import coo_matrix
def _create_sparse_train_and_test(ratings, n_users, n_items, TEST_SET_RATIO):

  """Given ratings, create sparse matrices for train and test sets.
  Args:
    ratings:  list of ratings tuples  (u, i, r)
    n_users:  number of users
    n_items:  number of items
  Returns:
     train, test sparse matrices in scipy coo_matrix format.
  """
  # pick a test size
  test_set_size = int(len(ratings) * TEST_SET_RATIO)
  print('Test set size:{}'.format(test_set_size))
    
    # select indexes randomly for the test set
  np.random.seed(1)
  test_set_idx = np.random.choice(range(len(ratings)),
                                    size=test_set_size, replace=False)
  test_set_idx = sorted(test_set_idx)

    # sift ratings into train and test sets
  ts_ratings = ratings[test_set_idx]
  tr_ratings = np.delete(ratings, test_set_idx, axis=0)
    
    # create training and test matrices as coo_matrix's
  u_tr, i_tr, r_tr = zip(*tr_ratings)
  tr_sparse = coo_matrix((r_tr, (u_tr, i_tr)), shape=(n_users, n_items))

  u_ts, i_ts, r_ts = zip(*ts_ratings)
  test_sparse = coo_matrix((r_ts, (u_ts, i_ts)), shape=(n_users, n_items))

  return tr_sparse, test_sparse

In [192]:
import numpy as np
def create_train_test_set(data):
  users = np.array(data[0])
  items = np.array(data[1])
  unique_users = np.unique(users)
  unique_items = np.unique(items)
  n_users = unique_users.shape[0]
  n_items = unique_items.shape[0]
  max_user = unique_users[-1]
  max_item = unique_items[-1]
  if n_users != max_user or n_items != max_item:
    # make an array of 0-indexed unique user ids corresponding to the dataset
    # stack of user ids
    z = np.zeros(max_user+1, dtype=object)
    z[unique_users] = np.arange(n_users)
    u_r = z[users]

    # make an array of 0-indexed unique item ids corresponding to the dataset
    # stack of item ids
    z = np.zeros(max_item+1, dtype=int)
    z[unique_items] = np.arange(n_items)
    i_r = z[items]

    # construct the ratings set from the three stacks
    np_ratings = np.array(data[2])
    ratings = np.zeros((np_ratings.shape[0], 3), dtype=object)
    ratings[:, 0] = u_r
    ratings[:, 1] = i_r
    ratings[:, 2] = np_ratings
  else:
    ratings = np.array(data)
    # deal with 1-based user indices
    ratings[:, 0] -= 1
    ratings[:, 1] -= 1
    TEST_SET_RATIO = 0.1
    train_sparse, test_sparse = _create_sparse_train_and_test(ratings, n_users,n_items,TEST_SET_RATIO)
    return ratings[:, 0], ratings[:, 1], train_sparse, test_sparse





In [193]:
options = PipelineOptions() #- Godwin and zalihat
with beam.Pipeline(options=options) as pipeline:
  (pipeline | 'ReadData' >> beam.io.ReadFromText('gs://coterie-rec/ml-100k/u.data', skip_header_lines=0) # read data with beam
       | 'SplitData' >> beam.Map(lambda x: x.split('\t'))
       | 'FormatToDict' >> beam.Map(lambda x: {"userid": x[0], "itemid": x[1], "ratings": x[2], "timestamp": x[3]}) # format to dict and name columns
       | 'DeleteNullData' >> beam.Filter(check_null) # pick non null columns
       | 'DeleteUnwantedData' >> beam.Map(del_col) # delete irrelevant columns
       | 'FormatData' >> beam.Map(format_data)
       | 'CreateTrainTestSet' >> beam.Map(create_train_test_set)
       | 'writecsv' >> beam.io.WriteToText('result')) # write data to csv

TypeError: ignored

In [181]:
result = pd.read_csv('result-00000-of-00001', header = None, names = ["userid", "itemid", "ratings"])
result.head()

Unnamed: 0,userid,itemid,ratings
0,196,242,3
1,186,302,3
2,22,377,1
3,244,51,2
4,166,346,1


# Using Pandas to try things out
Note that we can only us the pandas to try things, pandas will not be used in building the pipeline

In [120]:
import pandas as pd
import numpy as np

In [142]:
df = pd.read_csv(input_file, header=None, sep = '\t' )
df.head()

Unnamed: 0,0,1,2,3
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116
3,244,51,2,880606923
4,166,346,1,886397596


In [143]:
new_names = ['userId', 'itemId', 'rating', 'timestamp']
old_names = [0,1,2,3]
df = df.rename(columns=dict(zip(old_names, new_names)))
df.head()

Unnamed: 0,userId,itemId,rating,timestamp
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116
3,244,51,2,880606923
4,166,346,1,886397596


In [None]:

n_users = len(df.userId.unique() )
unique_users = df.userId.unique()
print("Number of rows: {}".format(len(df.userId)))
print("Number of unque users: {}".format(n_users))
print(unique_users)




Number of rows: 100000
Number of unque users: 943
[196 186  22 244 166 298 115 253 305   6  62 286 200 210 224 303 122 194
 291 234 119 167 299 308  95  38 102  63 160  50 301 225 290  97 157 181
 278 276   7  10 284 201 287 246 242 249  99 178 251  81 260  25  59  72
  87  42 292  20  13 138  60  57 223 189 243  92 241 254 293 127 222 267
  11   8 162 279 145  28 135  32  90 216 250 271 265 198 168 110  58 237
  94 128  44 264  41  82 262 174  43  84 269 259  85 213 121  49 155  68
 172  19 268   5  80  66  18  26 130 256   1  56  15 207 232  52 161 148
 125  83 272 151  54  16  91 294 229  36  70  14 295 233 214 192 100 307
 297 193 113 275 219 218 123 158 302  23 296  33 154  77 270 187 170 101
 184 112 133 215  69 104 240 144 191  61 142 177 203  21 197 134 180 236
 263 109  64 114 239 117  65 137 257 111 285  96 116  73 221 235 164 281
 182 129  45 131 230 126 231 280 288 152 217  79  75 245 282  78 118 283
 171 107 226 306 173 185 150 274 188  48 311 165 208   2 205 248  93 159
 

In [144]:
from scipy.sparse import coo_matrix
def _create_sparse_train_and_test(ratings, n_users, n_items, TEST_SET_RATIO):

  """Given ratings, create sparse matrices for train and test sets.
  Args:
    ratings:  list of ratings tuples  (u, i, r)
    n_users:  number of users
    n_items:  number of items
  Returns:
     train, test sparse matrices in scipy coo_matrix format.
  """
  # pick a test size
  test_set_size = int(len(ratings) * TEST_SET_RATIO)
  print('Test set size:{}'.format(test_set_size))
    
    # select indexes randomly for the test set
  np.random.seed(1)
  test_set_idx = np.random.choice(range(len(ratings)),
                                    size=test_set_size, replace=False)
  test_set_idx = sorted(test_set_idx)

    # sift ratings into train and test sets
  ts_ratings = ratings[test_set_idx]
  tr_ratings = np.delete(ratings, test_set_idx, axis=0)
    
    # create training and test matrices as coo_matrix's
  u_tr, i_tr, r_tr = zip(*tr_ratings)
  tr_sparse = coo_matrix((r_tr, (u_tr, i_tr)), shape=(n_users, n_items))

  u_ts, i_ts, r_ts = zip(*ts_ratings)
  test_sparse = coo_matrix((r_ts, (u_ts, i_ts)), shape=(n_users, n_items))

  return tr_sparse, test_sparse

In [145]:
import numpy as np
def create_train_test_set(df):
  users = df.loc[:,'userId'].values
  items = df.loc[:,'itemId'].values
  unique_users = np.unique(users)
  unique_items = np.unique(items)
  n_users = unique_users.shape[0]
  n_items = unique_items.shape[0]
  max_user = unique_users[-1]
  max_item = unique_items[-1]
  if n_users != max_user or n_items != max_item:
    # make an array of 0-indexed unique user ids corresponding to the dataset
    # stack of user ids
    z = np.zeros(max_user+1, dtype=int)
    z[unique_users] = np.arange(n_users)
    u_r = z[users]

    # make an array of 0-indexed unique item ids corresponding to the dataset
    # stack of item ids
    z = np.zeros(max_item+1, dtype=int)
    z[unique_items] = np.arange(n_items)
    i_r = z[items]

    # construct the ratings set from the three stacks
    np_ratings = df.loc[:, 'rating'].values
    ratings = np.zeros((np_ratings.shape[0], 3), dtype=object)
    ratings[:, 0] = u_r
    ratings[:, 1] = i_r
    ratings[:, 2] = np_ratings
  else:
    ratings = df.loc[:,'userId':'rating'].values
    # deal with 1-based user indices
    ratings[:, 0] -= 1
    ratings[:, 1] -= 1
    TEST_SET_RATIO = 0.1
    train_sparse, test_sparse = _create_sparse_train_and_test(ratings, n_users,n_items,TEST_SET_RATIO)
    return ratings[:, 0], ratings[:, 1], train_sparse, test_sparse





In [146]:
create_train_test_set(df)

Test set size:10000


(array([195, 185,  21, ..., 275,  12,  11]),
 array([ 241,  301,  376, ..., 1089,  224,  202]),
 <943x1682 sparse matrix of type '<class 'numpy.int64'>'
 	with 90000 stored elements in COOrdinate format>,
 <943x1682 sparse matrix of type '<class 'numpy.int64'>'
 	with 10000 stored elements in COOrdinate format>)

AttributeError: ignored

Unnamed: 0,userId,itemId,rating,timestamp
0,195,241,3,881250949
1,185,301,3,891717742
2,21,376,1,878887116
3,243,50,2,880606923
4,165,345,1,886397596
...,...,...,...,...
99995,879,475,3,880175444
99996,715,203,5,879795543
99997,275,1089,1,874795795
99998,12,224,2,882399156


In [170]:
dic = {'a': [1,3,4],
       'b':[4,5,0]
       }

In [171]:
dic['a','b[']

KeyError: ignored

In [174]:
dic

{'a': [1, 3, 4], 'b': [4, 5, 0]}