In [1]:
import findspark; findspark.init()
from os import environ
from aws_review_graph.commons import utils
from aws_review_graph.reviews import etl
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
environ['DEBUG'] = "1"
environ['PYSPARK_PYTHON']=f'.tox/dev/bin/python'
NUMBER_OF_BIN=10
SEED=1
BOOK_DISTINCT = 112278
MUSIC_DISTINCT = 59092

In [2]:
session, logger, settings= utils.start_spark()


getting spark session


22/01/06 18:43:43 WARN Utils: Your hostname, oasis resolves to a loopback address: 127.0.1.1; using 192.168.86.47 instead (on interface wlp0s20f3)
22/01/06 18:43:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/01/06 18:43:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


spark session created


In [7]:
w = Window.partitionBy('product_id')

books_df = (
        session.read.parquet("./data/books")
        .filter(F.col("star_rating") > 3)
        .select("customer_id", "product_id").drop_duplicates()
        .withColumn('users', F.approx_count_distinct('customer_id').over(w))
        .withColumnRenamed(
            'product_id',
            'book_product_id'
        ).filter('users >=100')
    )
music_df = (
    session.read.parquet("./data/music")
        .filter(F.col("star_rating") > 3)
        .select("customer_id", F.col("product_id")).drop_duplicates()
            .withColumn('users', F.approx_count_distinct('customer_id').over(w))
            .withColumnRenamed(
                'product_id',
                'music_product_id'
            ).filter('users >=80')
    )

# dense_reviews = etl.dense_cross_domains_reviews(books_df, music_df, 10, 10)
# dense_reviews.coalesce(5).write.parquet(
#         "./data/dense_reviews", mode="overwrite"
# )
dense_reviews = session.read.parquet("./data/dense_reviews")

In [8]:
dense_reviews.printSchema()
data_agg = etl.review_agg(dense_reviews)
data_agg.show()

# music_df.printSchema()
# print(43522/(1369*39975)*100)

root
 |-- customer_id: string (nullable = true)
 |-- book_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- music_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- books_count: integer (nullable = true)
 |-- music_count: integer (nullable = true)

+-----+-----------+-----------+-------------+-------------+
|users|total_books|total_music|distinc_books|distinc_music|
+-----+-----------+-----------+-------------+-------------+
|  437|       8515|       7405|         4565|         3808|
+-----+-----------+-----------+-------------+-------------+



In [6]:
# indexed_and_binned_reviews = etl.binned_indexed_user_reviews(dense_reviews)
# indexed_and_binned_reviews.coalesce(1).write.parquet(
#         "./data/indexed_and_binned_reviews", mode="overwrite"
# )
indexed_and_binned_reviews = session.read.parquet("./data/indexed_and_binned_reviews")
indexed_and_binned_reviews.show()

+-----------+-----------------+-----------+-----------+--------------------+--------------------+--------------------+--------------------+---+
|customer_id|customer_id_index|books_count|music_count|            book_ids|     book_id_indexes|           music_ids|    music_id_indexes|bin|
+-----------+-----------------+-----------+-----------+--------------------+--------------------+--------------------+--------------------+---+
|   31808048|              100|         45|         11|[0307269981, 0307...|[661, 1555, 3652,...|[B0000C6E4D, B000...|[1678, 2008, 1682...|  6|
|   52216152|              372|         14|         10|[0553804723, 0875...|[2853, 234, 67, 3...|[B001NESPHC, B000...|[1783, 916, 2887,...|  5|
|   10952233|                6|         15|         12|[1607740036, 0064...|[901, 2789, 1716,...|[B00BIVN82M, B00M...|[3732, 3729, 1965...|  5|
|   24638959|               73|         15|         13|[0312536631, 0765...|[3006, 4343, 2346...|[B00008H2LB, B00E...|[3532, 3703, 2495.

In [None]:
# music_graph = etl.get_item_graph(indexed_and_binned_reviews, 'music_id_indexes')
# music_graph.coalesce(1).write.parquet(
#         "./data/music_graph", mode="overwrite"
# )
music_graph = session.read.parquet("./data/music_graph")
music_graph.show()
music_graph.select('neighbors_size').summary().show()

In [10]:
# positive_negative_reviews = etl.positive_negative_reviews(session, indexed_and_binned_reviews, book_distinct=4565, music_distinct=3808)
# positive_negative_reviews.coalesce(1).write.parquet(
#         "./data/positive_negative_reviews", mode="overwrite"
# )
positive_negative_reviews = session.read.parquet("./data/positive_negative_reviews")
positive_negative_reviews.show()

root
 |-- customer_id: string (nullable = true)
 |-- customer_id_index: integer (nullable = true)
 |-- book_id_indexes: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- negative_books: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- music_id_indexes: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- negative_music: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- bin: integer (nullable = true)



                                                                                

+-----------+-----------------+--------------------+--------------------+--------------------+--------------------+---+
|customer_id|customer_id_index|     book_id_indexes|      negative_books|    music_id_indexes|      negative_music|bin|
+-----------+-----------------+--------------------+--------------------+--------------------+--------------------+---+
|   31808048|              100|[661, 1555, 3652,...|[3492, 1874, 2943...|[1678, 2008, 1682...|[2441, 1294, 3285...|  6|
|   52216152|              372|[2853, 234, 67, 3...|[1380, 2846, 837,...|[1783, 916, 2887,...|[1185, 600, 1044,...|  5|
|   10952233|                6|[901, 2789, 1716,...|[3967, 2273, 996,...|[3732, 3729, 1965...|[3214, 1151, 3063...|  5|
|   24638959|               73|[3006, 4343, 2346...|[3750, 41, 822, 3...|[3532, 3703, 2495...|[1693, 3695, 1222...|  2|
|   49944666|              267|[1852, 539, 1598,...|[2307, 800, 1254,...|[2745, 1809, 2716...|[2999, 3612, 1123...|  4|
|   51812938|              352|[2563, 32

In [4]:
# pn_explode = etl.user_positive_negative_explode(positive_negative_reviews, 'customer_id_index', 'book_id_indexes', 'negative_books')
# pn_explode.coalesce(1).write.parquet(
#         "./data/user_positive_negative_explode", mode="overwrite"
# )
pn_explode = session.read.parquet("./data/user_positive_negative_explode")
pn_explode.show()


+----+---+--------+--------+-----+
|user|bin|positive|negative|split|
+----+---+--------+--------+-----+
| 100|  6|     577|    3090|    1|
| 100|  6|    1523|    3291|    8|
| 100|  6|    1646|    3460|    8|
| 100|  6|    4281|    4132|    9|
| 100|  6|    1668|     985|    0|
| 100|  6|     530|    4500|    3|
| 100|  6|    2651|    4169|    6|
| 100|  6|    1152|     128|    4|
| 100|  6|    1152|     631|    4|
| 100|  6|      57|     177|    3|
|  73|  2|    3065|    1367|    2|
|  73|  2|    1129|    4067|    2|
| 267|  4|     539|    2941|    5|
| 267|  4|    2340|    2941|    8|
| 267|  4|    2864|    1030|    2|
| 267|  4|    2785|    1226|    0|
| 267|  4|    4434|    1030|    5|
| 267|  4|     748|    3048|    1|
| 267|  4|    3240|    3969|    4|
| 267|  4|    2341|     942|    0|
+----+---+--------+--------+-----+
only showing top 20 rows



In [12]:
dataset_bin_split = pn_explode.join(
    positive_negative_reviews.select(F.col('customer_id_index').alias('user'),'music_id_indexes','negative_music'),
    on=['user'])
dataset_bin_split.show()
dataset_bin_split.coalesce(1).write.parquet(
        "./data/dataset_bin_split", mode="overwrite"
)

+----+---+--------+--------+-----+--------------------+--------------------+
|user|bin|positive|negative|split|    music_id_indexes|      negative_music|
+----+---+--------+--------+-----+--------------------+--------------------+
| 100|  6|     577|    3090|    1|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|    1523|    3291|    8|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|    1646|    3460|    8|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|    4281|    4132|    9|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|    1668|     985|    0|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|     530|    4500|    3|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|    2651|    4169|    6|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|    1152|     128|    4|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|    1152|     631|    4|[1678, 2008, 1682...|[2441, 1294, 3285...|
| 100|  6|      57|     177|    3|[1678, 2008, 1682...|[2441, 1294, 3285...|

                                                                                

In [None]:
df = pq.read_table("./data/dataset_bin_split").to_pandas()
validate = df.loc[df['bin']==0]
train_test_df = df.loc[df['bin']>0]
# train_test_df['split'] = np.random.randint(10, size=len(train_test_df))
train_test_df.count()
# train_test_pq = pa.Table.from_pandas(train_test_df, preserve_index=False)
# pq.write_table(train_test_pq, './data/train_test_pq.parquet')