In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession \
    .builder \
    .appName("testing filter with multiple conditions") \
    .getOrCreate()

In [3]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

In [4]:
# drop all records that have none in either of userid or sesisonid fileds:
user_log_dropna = user_log.dropna(how='any', subset=['userId', 'sessionId'])
user_log_dropna.count()

10000

In [5]:
# didn't filter out any... try different filter...
user_log_valid = user_log_dropna.filter(user_log_dropna['userId'] != '')
user_log_valid.count()

9664

In [6]:
# let's find a user who downgraded their service first...
user_log_valid.filter("page = 'Submit Downgrade'").select(['firstname', 'userid']).show()

+---------+------+
|firstname|userid|
+---------+------+
|    Kelly|  1138|
+---------+------+



In [7]:
kelly_userid = 1138

In [8]:
# lets also show a few more interesting columns & order by...
user_log_valid.select(['userid', 'firstname', 'page', 'level', 'song', 'ts']).where(user_log['userid']==kelly_userid).show()

+------+---------+--------+-----+--------------------+-------------+
|userid|firstname|    page|level|                song|           ts|
+------+---------+--------+-----+--------------------+-------------+
|  1138|    Kelly|    Home| paid|                null|1513729066284|
|  1138|    Kelly|NextSong| paid| Everybody Everybody|1513729066284|
|  1138|    Kelly|NextSong| paid|               Gears|1513729313284|
|  1138|    Kelly|NextSong| paid|        Use Somebody|1513729552284|
|  1138|    Kelly|NextSong| paid|Love Of My Life (...|1513729783284|
|  1138|    Kelly|NextSong| paid|Down In The Valle...|1513730001284|
|  1138|    Kelly|NextSong| paid|Treat Her Like A ...|1513730263284|
|  1138|    Kelly|NextSong| paid|Everybody Thinks ...|1513730518284|
|  1138|    Kelly|NextSong| paid|      Fourteen Wives|1513730768284|
|  1138|    Kelly|NextSong| paid|   Love On The Rocks|1513731182284|
|  1138|    Kelly|NextSong| paid|           Breakeven|1513731435284|
|  1138|    Kelly|NextSong| paid| 

# final goal is to create a new column for a particular user based on these special events
# let's call this column, "phase"

In [9]:
# define function which returns integer based if event was a 'downgrade'
from pyspark.sql.types import IntegerType
flag_downgrade_event = udf(lambda x: 1 if x == 'Submit Downgrade' else 0, IntegerType())

In [10]:
# add udf to our spark dataframe
user_log_valid = user_log_valid.withColumn('downgraded', flag_downgrade_event('page'))

In [11]:
user_log_valid.select(['userid', 'firstname', 'ts', 'page', 'level', 'downgraded']).where(user_log_valid['userid'] == kelly_userid).sort('ts').show()

+------+---------+-------------+--------+-----+----------+
|userid|firstname|           ts|    page|level|downgraded|
+------+---------+-------------+--------+-----+----------+
|  1138|    Kelly|1513729066284|    Home| paid|         0|
|  1138|    Kelly|1513729066284|NextSong| paid|         0|
|  1138|    Kelly|1513729313284|NextSong| paid|         0|
|  1138|    Kelly|1513729552284|NextSong| paid|         0|
|  1138|    Kelly|1513729783284|NextSong| paid|         0|
|  1138|    Kelly|1513730001284|NextSong| paid|         0|
|  1138|    Kelly|1513730263284|NextSong| paid|         0|
|  1138|    Kelly|1513730518284|NextSong| paid|         0|
|  1138|    Kelly|1513730768284|NextSong| paid|         0|
|  1138|    Kelly|1513731182284|NextSong| paid|         0|
|  1138|    Kelly|1513731435284|NextSong| paid|         0|
|  1138|    Kelly|1513731695284|NextSong| paid|         0|
|  1138|    Kelly|1513731857284|NextSong| paid|         0|
|  1138|    Kelly|1513732160284|NextSong| paid|         

# I want to see only kelly's events when her level was NOT 'paid', or 'free'

how do i write it?
take above + add 1 more condition... ?

## attempt 1, using `and` :

In [12]:
user_log_valid.select(['userid', 'firstname', 'ts', 'page', 'level', 'downgraded']).where(user_log_valid['userid'] == kelly_userid and user_log_valid['level'] == 'free').sort('ts').show()

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

## attempt 2, using `&` :

In [13]:
user_log_valid.select(['userid', 'firstname', 'ts', 'page', 'level', 'downgraded']).where(user_log_valid['userid'] == kelly_userid & user_log_valid['level'] == 'free').sort('ts').show()

Py4JError: An error occurred while calling o107.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)



## attempt 3, using `&&` :

In [14]:
user_log_valid.select(['userid', 'firstname', 'ts', 'page', 'level', 'downgraded']).where(user_log_valid['userid'] == kelly_userid && user_log_valid['level'] == 'free').sort('ts').show()

SyntaxError: invalid syntax (<ipython-input-14-62d0e068c8ac>, line 1)