In [19]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

## Load from ES

reader = spark.read.format("org.elasticsearch.spark.sql")\
                .option("es.read.metadata", "true")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.net.ssl","false")\
                .option("es.nodes", "http://localhost")\

In [3]:
df = reader.load("just_testing_witcher")

In [4]:
df.show()

+--------+----------+------+--------------------+
|    name|profession|school|           _metadata|
+--------+----------+------+--------------------+
|  Geralt|   Witcher|  Wolf|[_index -> just_t...|
|   Letho|   Witcher| Viper|[_index -> just_t...|
|Yennefer|      Mage|  null|[_index -> just_t...|
| Jaskier|      Bard|  null|[_index -> just_t...|
|   Triss|      Mage|  null|[_index -> just_t...|
+--------+----------+------+--------------------+



## Load from ES with query

In [97]:
reader = spark.read.format("org.elasticsearch.spark.sql")\
                .option("es.read.metadata", "true")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.net.ssl","false")\
                .option("es.nodes", "http://localhost")\
                .option("es.query", """{"query": { "query_string": { "query": "*ag*" } } }""")

In [100]:
df = reader.load("just_testing_witcher")

In [101]:
df.show()

+--------+----------+------+--------------------+
|    name|profession|school|           _metadata|
+--------+----------+------+--------------------+
|Yennefer|      Mage|  null|[_index -> just_t...|
|   Triss|      Mage|  null|[_index -> just_t...|
+--------+----------+------+--------------------+



# Save to ES

### Reading MovieLens dataset

In [7]:
path = "./ml-latest-small/"

In [8]:
movies = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path + "movies.csv")

In [13]:
ratings = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path + "ratings.csv")


In [27]:
tags = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path + "tags.csv")

In [29]:
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
tags.createOrReplaceTempView("tags")

In [11]:
movies.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



In [28]:
tags.show(3)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
+------+-------+---------------+----------+
only showing top 3 rows



In [30]:
ratings.show(3)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
+------+-------+------+---------+
only showing top 3 rows



### Let's create a movies with tags and genres in one table

In [34]:
tags_materialized = tags.groupBy('movieId').agg(F.collect_set('tag').alias('tags'))

In [36]:
tags_materialized.createOrReplaceTempView("tags_materialized")

In [35]:
tags_materialized.show(3)

+-------+--------------+
|movieId|          tags|
+-------+--------------+
|    471|   [hula hoop]|
|   1088|[music, dance]|
|   1580|      [aliens]|
+-------+--------------+
only showing top 3 rows



In [49]:
movies_materialized = movies.select(\
                                F.col("movieId"),
                                F.col("title"),
                                F.split(F.col("genres"), '\|').alias("genres")\
                            )

In [50]:
movies_materialized.createOrReplaceTempView("movies_materialized")

In [51]:
movies_materialized.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|[Adventure, Anima...|
|      2|      Jumanji (1995)|[Adventure, Child...|
|      3|Grumpier Old Men ...|   [Comedy, Romance]|
+-------+--------------------+--------------------+
only showing top 3 rows



In [52]:
movies_complete = spark.sql("""SELECT m.movieId, title, genres, tags
                                FROM movies_materialized m
                                LEFT JOIN tags_materialized t ON m.movieId = t.movieId
                               """)

In [53]:
movies_complete.show(3)

+-------+--------------------+--------------------+--------------------+
|movieId|               title|              genres|                tags|
+-------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|[Adventure, Anima...|        [pixar, fun]|
|      2|      Jumanji (1995)|[Adventure, Child...|[fantasy, game, m...|
|      3|Grumpier Old Men ...|   [Comedy, Romance]|        [old, moldy]|
+-------+--------------------+--------------------+--------------------+
only showing top 3 rows



### Save to ES

In [60]:
esURL = "localhost"

movies_complete.write\
  .format("org.elasticsearch.spark.sql")\
  .option("es.port","9200")\
  .option("es.net.ssl","false")\
  .option("es.nodes", esURL)\
  .mode("Overwrite")\
  .save("movielens")

### Let's prepare ratings with iso date

In [94]:
ratings_materialized = ratings.select(\
                                        F.col("movieId"),\
                                        F.col("userId"),\
                                        F.col("rating"),\
                                        F.col("timestamp").alias("datetime")\
                                    )

In [95]:
ratings_materialized.show(3)

+-------+------+------+---------+
|movieId|userId|rating| datetime|
+-------+------+------+---------+
|      1|     1|   4.0|964982703|
|      3|     1|   4.0|964981247|
|      6|     1|   4.0|964982224|
+-------+------+------+---------+
only showing top 3 rows



In [96]:
esURL = "localhost"

ratings_materialized.write\
  .format("org.elasticsearch.spark.sql")\
  .option("es.port","9200")\
  .option("es.net.ssl","false")\
  .option("es.nodes", esURL)\
  .mode("Overwrite")\
  .save("ratings")