In [40]:
import pandas as pd
import pyspark as ps # for the pyspark suite
import warnings         # for displaying warning

#Initiate a SparkSession. A SparkSession embeds both a SparkContext and 
#a SQLContext to use RDD-based and DataFrame-based functionalities of Spark.
spark = ps.sql.SparkSession.builder \
        .master("local[4]") \
        .appName("df lecture") \
        .getOrCreate()

sc = spark.sparkContext

from string import Template

from datetime import datetime
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType

#### Question 1 - 

#### Importing and Loading the data
I have used .csv file format to store given data.

In [10]:

media_df = spark.read.csv("data/media.csv",header=True,sep="|")

In [11]:
media_df.show()

+----+-----------+-----------+----------+----+---------+
| STB|      TITLE|   PROVIDER|      DATE| REV|VIEW_TIME|
+----+-----------+-----------+----------+----+---------+
|stb1| the matrix|warner bros|2014-04-01|4.00|     1:30|
|stb1|unbreakable|buena vista|2014-04-03|6.00|     2:05|
|stb2| the hobbit|warner bros|2014-04-02|8.00|     2:45|
|stb3| the matrix|warner bros|2014-04-02|4.00|     1:05|
+----+-----------+-----------+----------+----+---------+



In [12]:
media_df.printSchema()

print(media_df.count())

root
 |-- STB: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- PROVIDER: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- REV: string (nullable = true)
 |-- VIEW_TIME: string (nullable = true)

4


In [43]:
# To perform the aggregate functions on REV column convert string datatype to Integer --> for question 2.2
media_df = media_df.withColumn("REV", media_df["REV"].cast(IntegerType()))
media_df.printSchema()

root
 |-- STB: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- PROVIDER: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- REV: integer (nullable = true)
 |-- VIEW_TIME: string (nullable = true)



### Data Exploration

In [44]:
# create view to access data from spark sql
media_df.createOrReplaceTempView("media_business")

In [179]:
result = spark.sql("SELECT STB,TITLE,PROVIDER ,DATE, REV,VIEW_TIME FROM media_business where REV >= 5.0 order by PROVIDER LIMIT 5")
result.show()

+----+-----------+-----------+----------+---+---------+
| STB|      TITLE|   PROVIDER|      DATE|REV|VIEW_TIME|
+----+-----------+-----------+----------+---+---------+
|stb1|unbreakable|buena vista|2014-04-03|  6|     2:05|
|stb2| the hobbit|warner bros|2014-04-02|  8|     2:45|
+----+-----------+-----------+----------+---+---------+



In [187]:
#/query -s TITLE,REV:sum,STB:collect -g TITLE

result = spark.sql("SELECT TITLE, sum(REV),count(distinct(STB)) FROM media_business group by TITLE")
result.show()

+-----------+--------+-------------------+
|      TITLE|sum(REV)|count(DISTINCT STB)|
+-----------+--------+-------------------+
| the hobbit|       8|                  1|
|unbreakable|       6|                  1|
| the matrix|       8|                  2|
+-----------+--------+-------------------+



In [380]:

result = spark.sql("SELECT TITLE, REV FROM media_business WHERE DATE ='2014-04-02' AND (TITLE = 'the hobbit' OR TITLE = 'the matrix')")
result.show()

+----------+---+
|     TITLE|REV|
+----------+---+
|the hobbit|  8|
|the matrix|  4|
+----------+---+



#### Data Wrangling

In [344]:
class Function_Param():
    '''Data Wrangling : This class is created to extract select,oderby, filter, groupby, 
    aggregate and boolean functions and extract respective column names'''
    
    
    def get_string(string):
        '''
                INPUT: string
                OUTPUT: list
                Given one string returns the list of strings.
            '''
        string =string.split()
        return string
            
        
        
    
    
    def find_parameters(value, items):
        '''
                INPUT: function name:string and user input string in tokenized list format.
                OUTPUT: list
                To get the parameters for each function like select, order by, filter & group by
            '''
        i = items.index(value)
        return items[i+1]
    
    
    def fetch_select_param(string):
        '''
                INPUT: string
                OUTPUT: list
                Extract select function parameters in list.
            '''
        # get the maching function in variable
        s_matching = [s for s in string if "-s" in s]
        if s_matching!=[]:
            s_param =Function_Param.find_parameters("-s",string).split(',')
        else:
            return
        return s_param
    
    def fetch_order_param(string):
        '''
                INPUT: string
                OUTPUT: list
                Extract orderby function parameters in list.
            '''
        o_matching = [o for o in string if "-o" in o]
        if o_matching!=[]:
            o_param =find_parameters("-o",string).split(',')
        else:
            return
        return o_param
        
    def fetch_filter_param(string):
        '''
                INPUT: string
                OUTPUT: list
                Extract filter function parameters in list.
            '''
        f_matching = [f for f in string if "-f" in f]
        if f_matching != []:
            f_param =find_parameters("-f",string).split(',')
        else:
            return
        return f_param
    
    def fetch_groupby_param(string):
        '''
                INPUT: string
                OUTPUT: list
                Extract group by function parameters in list.
            '''
        g_matching = [g for g in string if "-g" in g]
        if g_matching !=[]:
            g_param =find_parameters("-g",string).split(',')
        else:
            return
        return g_param

    def fetch_filter_param_strval(string):
        '''
                INPUT: string
                OUTPUT: list
                Extract filter function for the columns whose values are descriptive with 1 white 
                space parameters in list.
            '''
        f_matching = [f for f in string if "-f" in f]
        if f_matching != []:
            f_param =find_parameters_filter_boolean_strval("-f",string).split(',')
        else:
            return
        return f_param
   
    def fetch_OR_boolean_param(string):
        '''
                INPUT: string
                OUTPUT: list
                Extract boolean or function columns names in list.
            '''
        b_matching = [b for b in string if "OR" in b]
        if b_matching2 !=[]:
            b_param =find_parameters("OR",string).split(',')
        else:
            return
        return b_param
    
    def fetch_AND_boolean_param(string):
        '''
                INPUT: string
                OUTPUT: list
                Extract boolean and function columns names in list.
            '''
        a_matching = [a for a in string if "AND" in a]
        if a_matching1 !=[]:
            a_param =find_parameters("AND",string).split(',')
        else:
            return
        return a_param
    
   # converts select function list of column names/parameters to each column name variable 
    def select_columnnames(s_param):
        
            
        for n, val in enumerate(s_param):
            globals()["s_column%d"%n] = val
            
    # converts orderby function list of column names/parameters to each column name variable 
    def orderby_columnnames(o_param):
        for n, val in enumerate(o_param):
            globals()["o_column%d"%n] = val
            
     # converts group by function list of column names/parameters to each column name variable    
    def groupby_columnnames(g_param):
        for n, val in enumerate(g_param):
            globals()["g_column%d"%n] = val
    
     # converts filter function list of column names/parameters to each column name variable    
    def filter_columnnames(f_param):
        f_column = (",".join(s for s in f_param if "=".lower() in s.lower())).split("=",2)
        if f_column !="":
            return f_column[0],f_column[1]
        else:
            return
            
    # converts aggregate function list of column names/parameters to each column name variable       
    def agg_columnnames(ag_param):
        for n, val in enumerate(ag_param):
            globals()["ag_column%d"%n] = val
            
    # converts boolean AND list of column names/parameters to each column name variable       
    def and_columnnames(a_param):
        a_column = (",".join(s for s in a_param if "=" in s.lower())).split("=",2)
        if a_column !="":
            return a_column[0],a_column[1]
        else:
            return
            
    # converts boolean OR list of column names/parameters to each column name variable       
    def or_columnnames(b_param):
        b_column = (",".join(s for s in b_param if "=" in s.lower())).split("=",2)
        if b_column !="":
            return b_column[0],b_column[1]
        else:
            return
        
    # aggregate function -sum - column names are returned as column name , aggregate function name respectively    
    def agg_sum_param(s_param):
        sum_col = (",".join(s for s in s_param if ":sum".lower() in s.lower()))
        sum_col =sum_col.split(":",2)
        if sum_col !="":
            return sum_col[0],sum_col[1]
        else:
            return
        
    # aggregate function -min - column names are returned as column name , aggregate function name respectively    
    def agg_min_param(s_param):
        min_col = (",".join(s for s in s_param if ":min".lower() in s.lower())).split(":",2)
        if min_col !="":
            return min_col[0],min_col[1]
        else:
            return
        
     # aggregate function -max - column names are returned as column name , aggregate function name respectively    
    def agg_max_param(s_param):
        max_col = (",".join(s for s in s_param if ":max".lower() in s.lower())).split(":",2)
        if max_col !="":
            return max_col[0],max_col[1]
        else:
            return
        
    # aggregate function -count - column names are returned as column name , aggregate function name respectively        
    def agg_count_param(s_param):
        count_col = (",".join(s for s in s_param if ":count".lower() in s.lower())).split(":",2)
        if count_col !="":
            return count_col[0],count_col[1]
        else:
            return
        
     # aggregate function -collect - column names are returned as column name , aggregate function name respectively    
    def agg_collect_param(s_param):
        collect_col = (",".join(s for s in s_param if ":COLLECT".lower() in s.lower())).split(":",2)
        if collect_col !="":
            return collect_col[0],collect_col[1]
        else:
            return
        
    # to get the values of functions which has string values with one space
    def find_parameters_filter_boolean_strval(value, items):
        i = items.index(value)
        x =(" ".join(s for s in items[i+1:i+3]))
        return x
    
     
    def fetch_OR_boolean_param_strval(string):
        b_matching = [b for b in string if "OR" in b]
        if b_matching !=[]:
            b_param =find_parameters_filter_boolean_strval("OR",string).split(',')
        else:
            return
        return b_param
    
     # to get the values of functions which has string values with one space
    def fetch_AND_boolean_param_strval(string):
        a_matching = [a for a in string if "AND" in a]
        a_matching
        if a_matching !=[]:
            a_param =find_parameters_filter_boolean_strval("AND",string).split(',')
        else:
            return
        return a_param
    
     
    

In [1]:
class DML_Functions():
    '''This class is created to build spark sql queries with select,oderby, filter, groupby, 
    aggregate and boolean functions '''
    
    def select_orderby(table_name, select_list,orderby_list):
        '''
        Returns the SQL query for computing column statistics.
        Passing Input table_name, select function column names and 
        order by function column names.
        '''
        Function_Param.select_columnnames(selected_list)
        Function_Param.orderby_columnnames(orderby_list)
        query = '''SELECT ${s_col1}, ${s_col2},${s_col3}
                   FROM ${table_name}
        ORDER BY ${o_col1}, ${o_col2}'''
        t = Template(query)
        qry_template= t.substitute(s_col1 = s_column0, s_col2 = s_column1,s_col3 = s_column2,o_col1 = o_column0, o_col2 = o_column1, table_name=table_name)

        return qry_template
    
    
        #2.1
    #Function Select_Filter
    #./query -s TITLE,REV,DATE -f DATE=2014-04-01

    def select_filter(table_name, selected_list,filter_list):
        '''
        Returns the SQL query for computing column statistics.
        Passing Input table_name, select function column names and 
        filter/where clause column names.
        '''
        Function_Param.select_columnnames(selected_list)
        f_column1,filter_value=Function_Param.filter_columnnames(filter_list)
        if f_column1 == 'DATE':
            filter_value = "CAST('"+ filter_value +"' AS date)"

        query = '''SELECT ${s_col1}, ${s_col2},${s_col3}
                   FROM ${table_name}
                   WHERE ${f_column1} = ${filter_value}'''
        t = Template(query)
        qry_template= t.substitute(s_col1 = s_column0, s_col2 = s_column1,s_col3 = s_column2,f_column1=f_column1, table_name=table_name, filter_value=filter_value)  
        return qry_template
    
    
    
    def select_groupby_agg(table_name, selected_list,groupby_list,ag_col1,ag_funct1,ag_col2,ag_funct2):
        '''
        Returns the SQL for computing column statistics.
        Passing Input table_name, select function column names, 
        group by function column names and aggregate functions and aggregrate function column names.
        '''
        Function_Param.select_columnnames(selected_list)
        Function_Param.groupby_columnnames(groupby_list)
        
        

        query = '''SELECT ${s_col1}, ${ag_funct1}(${ag_col1}),count(distinct(${ag_col2})) 
                    FROM ${table_name} 
                    group by ${g_column1}'''
      
        t = Template(query)
        qry_template= t.substitute(s_col1 = s_column0,g_column1=g_column0, table_name=table_name,ag_col1=ag_col1,ag_funct1=ag_funct1,ag_col2=ag_col2,ag_funct2=ag_funct2)  
        return qry_template
    
    
    
    def select_advfilter_single_exp(table_name, selected_list,filter_list,operator_list,expression):
        '''
        Returns the SQL for computing column statistics.
        Passing Input table_name, select function column names, 
        filter/where clause column names and boolean operators and boolean operators column names.
        '''
        Function_Param.select_columnnames(selected_list)
        f_column1,filter_value=Function_Param.filter_columnnames(filter_list)
        op_column1,op_value=Function_Param.or_columnnames(operator_list)
        if f_column1 == 'DATE':
            filter_value = "CAST('"+ filter_value +"' AS date)"
        elif op_column1 == 'DATE':
            op_value = "CAST('"+ op_value +"' AS date)"

        query = '''SELECT ${s_col1}, ${s_col2}
                   FROM ${table_name}
                   WHERE ${f_column1} = ${filter_value} ${expression} ${op_column1} = ${op_value}'''
        t = Template(query)
        qry_template= t.substitute(s_col1 = s_column0, s_col2 = s_column1,f_column1=f_column1, table_name=table_name, filter_value=filter_value,op_column1=op_column1, expression=expression, op_value=op_value)  
        return qry_template
    
    
    def select_advfilter_double_exp(table_name, selected_list,filter_list,operator_list1,operator_list2,expression1,expression2):
        '''
        Returns the SQL for computing column statistics.
        Passing Input table_name, select function column names, 
        filter/where clause column names and boolean operators and boolean operators column names.
        '''
        Function_Param.select_columnnames(selected_list)
        f_column1,filter_value=Function_Param.filter_columnnames(filter_list)
        op_column1,op_value1=Function_Param.and_columnnames(operator_list1)
        op_column2,op_value2=Function_Param.or_columnnames(operator_list2)
        if f_column1 == 'DATE':
            filter_value = "CAST('"+ filter_value +"' AS date)"
        elif op_column1 == 'DATE':
            op_value = "CAST('"+ op_value +"' AS date)"
        
            
        query = '''SELECT ${s_col1}, ${s_col2}
                   FROM ${table_name}
                   WHERE ${f_column1} = ${filter_value} ${expression1} (${op_column1} = ${op_value1} ${expression2} ${op_column2} = ${op_value2})'''
        t = Template(query)
        qry_template= t.substitute(s_col1 = s_column0, s_col2 = s_column1,f_column1=f_column1, table_name=table_name, filter_value=filter_value,op_column1=op_column1,op_column2=op_column2, expression1=expression1,expression2=expression2, op_value1=op_value1,op_value2=op_value2)  
        return qry_template

### Question 2.1
###### The next task is to create a query tool that can execute simple queries against the datastore you created in step one.  The tool should accept command line args for SELECT, ORDER and FILTER functions:

Created test case below for user input -->./query -s TITLE,REV,DATE -o DATE,TITLE


In [151]:
if __name__ == '__main__':
    # Accept input string from user. Given input string is --> ./query -s TITLE,REV,DATE -o DATE,TITLE
    string = str(input())
    st = Function_Param.get_string(string)
    selected_list =Function_Param.fetch_select_param(st)
    ordered_list=Function_Param.fetch_order_param(st)



In [148]:
    selected_list,ordered_list

(['TITLE', 'REV', 'DATE'], ['DATE', 'TITLE'])

In [152]:
    # Creating the dynamic query by calling select_orderby function in DML_Functions class.
    qry = DML_Functions.select_orderby('media_business',selected_list,ordered_list)

    result = spark.sql(qry)
    '''The result of Question 2.1 1st test case --> ./query -s TITLE,REV,DATE -o DATE,TITLE '''
    result.show()

+-----------+---+----------+
|      TITLE|REV|      DATE|
+-----------+---+----------+
| the matrix|  4|2014-04-01|
| the hobbit|  8|2014-04-02|
| the matrix|  4|2014-04-02|
|unbreakable|  6|2014-04-03|
+-----------+---+----------+



#### Question 2.1
For DML functions select & filter. 
Created test case for user input -->./query -s TITLE,REV,DATE -f DATE=2014-04-01


In [173]:
    # Accept input string from user. Given input string is --> ./query -s TITLE,REV,DATE -f DATE=2014-04-01
    string = str(input())
    st = Function_Param.get_string(string)
    selected_list =Function_Param.fetch_select_param(st)
    filter_list=Function_Param.fetch_filter_param(st)

In [235]:
    selected_list,filter_list

(['TITLE', 'REV:sum', 'STB:collect'], ['DATE=2014-04-01'])

In [175]:
    qry = DML_Functions.select_filter('media_business',selected_list,filter_list)

    result = spark.sql(qry)
    '''The result of Question 2.1 2nd test case --> ./query -s TITLE,REV,DATE -f DATE=2014-04-01 '''
    result.show()

+----------+---+----------+
|     TITLE|REV|      DATE|
+----------+---+----------+
|the matrix|  4|2014-04-01|
+----------+---+----------+



### Question 2.2

 
The next step is to add group by and aggregate functions to your query tool.  Your tool should support the following aggregates:
Created below test case for user input --> ./query -s TITLE,REV:sum,STB:collect -g TITLE


In [229]:
#./query -s TITLE,REV:sum,STB:collect -g TITLE
string = str(input())
st = Function_Param.get_string(string)
selected_list =Function_Param.fetch_select_param(st)
groupby_list=Function_Param.fetch_groupby_param(st)
ag_col1,ag_funct1 = Function_Param.agg_sum_param(st)



In [230]:
selected_list,groupby_list

(['TITLE', 'REV:sum', 'STB:collect'], ['TITLE'])

In [234]:
qry = DML_Functions.select_groupby_agg('media_business',selected_list,groupby_list,'REV','sum','STB','collect')

result = spark.sql(qry)
'''The result of Question 2.2 test case --> ./query -s TITLE,REV:sum,STB:collect -g TITLE '''

result.show()

+-----------+--------+-------------------+
|      TITLE|sum(REV)|count(DISTINCT STB)|
+-----------+--------+-------------------+
| the hobbit|       8|                  1|
|unbreakable|       6|                  1|
| the matrix|       8|                  2|
+-----------+--------+-------------------+



In [237]:
# Validating the answer using Spark df
media_grp_df = media_df

In [238]:
df_agg = media_grp_df.groupby("TITLE").\
    agg(
        func.sum("REV"),
         func.countDistinct("STB") 
    )

In [239]:
df_agg.show()

+-----------+--------+-------------------+
|      TITLE|sum(REV)|count(DISTINCT STB)|
+-----------+--------+-------------------+
| the hobbit|       8|                  1|
|unbreakable|       6|                  1|
| the matrix|       8|                  2|
+-----------+--------+-------------------+



### Question 2.3
Advanced filter function 

##### Using the single boolean operator  - 
Build the test case for user input -->
./query -s TITLE,REV -f 'TITLE="the hobbit" OR TITLE="the matrix"'

In [329]:
# Input query string --> "./query -s TITLE,REV -f TITLE='the hobbit' OR TITLE='the matrix'"
string = str(input())
st = Function_Param.get_string(string)
                               
selected_list =Function_Param.fetch_select_param(st)
filter_list=Function_Param.fetch_filter_param_strval(st)
operator_list = Function_Param.fetch_OR_boolean_param_strval(st)


In [330]:
st,selected_list,filter_list,operator_list

(['./query',
  '-s',
  'TITLE,REV',
  '-f',
  "TITLE='the",
  "hobbit'",
  'OR',
  "TITLE='the",
  "matrix'"],
 ['TITLE', 'REV'],
 ["TITLE='the hobbit'"],
 ["TITLE='the matrix'"])

In [381]:
qry = DML_Functions.select_advfilter_single_exp('media_business',selected_list,filter_list,operator_list,'OR')
result = spark.sql(qry)
# Below is the result of question 2.3 with user input as "./query -s TITLE,REV -f TITLE='the hobbit' OR TITLE='the matrix'"
result.show()

+-----------+---+
|      TITLE|REV|
+-----------+---+
| the matrix|  4|
|unbreakable|  6|
| the matrix|  4|
+-----------+---+



### Question 2.3

Add a filter function which evaluates boolean AND and OR expressions in the following format:
            STB="stb1" AND TITLE="the hobbit" OR TITLE="unbreakable"
            Assume AND has higher precedence than OR.  Parens can be added to change the above statement to the more logical:
            STB="stb1" AND (TITLE="the hobbit" OR TITLE="unbreakable")
            
            
Advanced filter function. 
Using the double boolean operator  - 
Build the test case for user input -->
"./query -s TITLE,REV,DATE -f DATE=2014-04-02 AND TITLE='the hobbit' OR TITLE='the matrix'"

In [384]:
# Input query string --> "./query -s TITLE,REV,DATE -f DATE=2014-04-02 AND TITLE='the hobbit' OR TITLE='the matrix'"
string = str(input())
st = Function_Param.get_string(string)
                               
selected_list =Function_Param.fetch_select_param(st)
filter_list=Function_Param.fetch_filter_param(st)
operator_list1 = Function_Param.fetch_AND_boolean_param_strval(st)
operator_list2 = Function_Param.fetch_OR_boolean_param_strval(st)


DATE=2014-04-02 AND


In [386]:
st,selected_list,filter_list,operator_list1,operator_list2

(['./query',
  '-s',
  'TITLE,REV,DATE',
  '-f',
  'DATE=2014-04-02',
  'AND',
  "TITLE='the",
  "hobbit'",
  'OR',
  "TITLE='the",
  "matrix'"],
 ['TITLE', 'REV', 'DATE'],
 ['DATE=2014-04-02'],
 ["TITLE='the hobbit'"],
 ["TITLE='the matrix'"])

In [387]:
qry = DML_Functions.select_advfilter_double_exp('media_business',selected_list,filter_list,operator_list1,operator_list2,'AND', 'OR')
result = spark.sql(qry)
print(qry)
# Below is the result of question 2.3 with user input as "./query -s TITLE,REV -f TITLE='the hobbit' OR TITLE='the matrix'"
result.show()

SELECT TITLE, REV
                   FROM media_business
                   WHERE DATE = CAST('2014-04-02' AS date) AND (TITLE = 'the hobbit' OR TITLE = 'the matrix')
+----------+---+
|     TITLE|REV|
+----------+---+
|the hobbit|  8|
|the matrix|  4|
+----------+---+



### Question 2.3
Add a filter function which evaluates boolean AND and OR expressions in the following format:
            STB="stb1" AND TITLE="the hobbit" OR TITLE="unbreakable"
            Assume AND has higher precedence than OR.  Parens can be added to change the above statement to the more logical:
            STB="stb1" AND (TITLE="the hobbit" OR TITLE="unbreakable")


Advanced filter function. Using the double boolean operator  - 
Build the test case for user input -->
"./query -s TITLE,REV,DATE -f STB='stb1' AND TITLE='the hobbit' OR TITLE='the matrix'"

In [388]:
# Input query string --> "./query -s TITLE,REV,DATE -f STB='stb1' AND TITLE='the hobbit' OR TITLE='the matrix'"
string = str(input())
st = Function_Param.get_string(string)
                               
selected_list =Function_Param.fetch_select_param(st)
filter_list=Function_Param.fetch_filter_param(st)
operator_list1 = Function_Param.fetch_AND_boolean_param_strval(st)
operator_list2 = Function_Param.fetch_OR_boolean_param_strval(st)


STB='stb1' AND


In [389]:
filter_list = ["STB='stb1'"]

In [390]:
st,selected_list,filter_list,operator_list1,operator_list2

(['./query',
  '-s',
  'TITLE,REV,DATE',
  '-f',
  "STB='stb1'",
  'AND',
  "TITLE='the",
  "hobbit'",
  'OR',
  "TITLE='the",
  "matrix'"],
 ['TITLE', 'REV', 'DATE'],
 ["STB='stb1'"],
 ["TITLE='the hobbit'"],
 ["TITLE='the matrix'"])

In [391]:
qry = DML_Functions.select_advfilter_double_exp('media_business',selected_list,filter_list,operator_list1,operator_list2,'AND', 'OR')
result = spark.sql(qry)
print(qry)
# Below is the result of question 2.3 with user input as "./query -s TITLE,REV,DATE -f STB='stb1' AND TITLE='the hobbit' OR TITLE='the matrix'"
result.show()

SELECT TITLE, REV
                   FROM media_business
                   WHERE STB = 'stb1' AND (TITLE = 'the hobbit' OR TITLE = 'the matrix')
+----------+---+
|     TITLE|REV|
+----------+---+
|the matrix|  4|
+----------+---+

