In [1]:
#importing findspark
import findspark
findspark.init()


In [91]:
# create a SparkSession object
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import count, col, min, max
spark = SparkSession.builder.master("local").appName("whitehouselog").getOrCreate()
#spark.conf.set('spark.sql.caseSensitive', True)
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import split

In [3]:
sc = spark.sparkContext

In [4]:
data = [("alex", [1,2]) , ("jane", []) , ("ted", [1,2,3])]
columns = ["name", "tricks"]

In [5]:
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)

+----+---------+
|name|tricks   |
+----+---------+
|alex|[1, 2]   |
|jane|[]       |
|ted |[1, 2, 3]|
+----+---------+



In [6]:
df2 = df.withColumn("num_tricks", f.size(col("tricks")))
df2.show()

+----+---------+----------+
|name|   tricks|num_tricks|
+----+---------+----------+
|alex|   [1, 2]|         2|
|jane|       []|         0|
| ted|[1, 2, 3]|         3|
+----+---------+----------+



In [7]:
df3 = df2.filter(df2.num_tricks > 1)

In [8]:
df3.show()

+----+---------+----------+
|name|   tricks|num_tricks|
+----+---------+----------+
|alex|   [1, 2]|         2|
| ted|[1, 2, 3]|         3|
+----+---------+----------+



In [9]:
df2 = df2.orderBy("num_tricks",ascending=False).limit(5).show()

+----+---------+----------+
|name|   tricks|num_tricks|
+----+---------+----------+
| ted|[1, 2, 3]|         3|
|alex|   [1, 2]|         2|
|jane|       []|         0|
+----+---------+----------+



In [10]:
df4 = df.select(df.name,(f.explode(df.tricks).alias("Trick Number")))

In [11]:
df4.show()

+----+------------+
|name|Trick Number|
+----+------------+
|alex|           1|
|alex|           2|
| ted|           1|
| ted|           2|
| ted|           3|
+----+------------+



In [12]:
df5 = df4.groupBy('Trick Number').agg(f.count('Trick Number').alias('my_count'))

In [13]:
df5.show()

+------------+--------+
|Trick Number|my_count|
+------------+--------+
|           1|       2|
|           3|       1|
|           2|       2|
+------------+--------+



In [14]:
df5.sort('my_count', ascending=False).show()

+------------+--------+
|Trick Number|my_count|
+------------+--------+
|           2|       2|
|           1|       2|
|           3|       1|
+------------+--------+



In [15]:
df6 = df.filter(df.name.isNotNull())

In [16]:
df6.show()

+----+---------+
|name|   tricks|
+----+---------+
|alex|   [1, 2]|
|jane|       []|
| ted|[1, 2, 3]|
+----+---------+



In [17]:
df6 = df6.withColumn("num_tricks", f.size(col("tricks")))

In [18]:
df6.show()

+----+---------+----------+
|name|   tricks|num_tricks|
+----+---------+----------+
|alex|   [1, 2]|         2|
|jane|       []|         0|
| ted|[1, 2, 3]|         3|
+----+---------+----------+



In [19]:
filtered = df6.filter(df6.num_tricks == 0)

In [20]:
filtered.select('num_tricks').show()

+----------+
|num_tricks|
+----------+
|         0|
+----------+



In [21]:
filtered.select(df6.name).distinct().show()

+----+
|name|
+----+
|jane|
+----+



In [22]:
filtered2= df6.filter(df6.num_tricks > 1)

In [23]:
filtered2.select(df6.name).distinct().show()

+----+
|name|
+----+
| ted|
|alex|
+----+



In [24]:
data2 = [("g1", "1", "2.3") , ("g1", "2", "1.5") , ("g1", "3", "2.5"), ("g1", "1", "4.1"), ("g2", "1", "1.3"), 
        ("g2", "2", "1.8"), ("g2", "3", "3.5"), ("g2", "1", "4.3"), ("g2", "1", "2.9")]

columns = ["geneID", "reference", "geneValue"]



In [25]:
dataframe = spark.createDataFrame(data=data2,schema=columns)
dataframe.show(truncate=False)

+------+---------+---------+
|geneID|reference|geneValue|
+------+---------+---------+
|g1    |1        |2.3      |
|g1    |2        |1.5      |
|g1    |3        |2.5      |
|g1    |1        |4.1      |
|g2    |1        |1.3      |
|g2    |2        |1.8      |
|g2    |3        |3.5      |
|g2    |1        |4.3      |
|g2    |1        |2.9      |
+------+---------+---------+



In [26]:
gene_path = "/tmp/gene.txt"
rdd_gene = sc.textFile(gene_path)

In [27]:
df=dataframe

In [28]:
df_g1 = df.filter(df.reference == '1')

In [29]:
df_geneID = df_g1.groupBy('geneID').agg({'reference' : 'count', 'geneValue' : 'sum'})

In [30]:
df_geneID.show()

+------+----------------+------------------+
|geneID|count(reference)|    sum(geneValue)|
+------+----------------+------------------+
|    g2|               3|               8.5|
|    g1|               2|6.3999999999999995|
+------+----------------+------------------+



In [31]:
df.filter(df.reference < 3).show()

+------+---------+---------+
|geneID|reference|geneValue|
+------+---------+---------+
|    g1|        1|      2.3|
|    g1|        2|      1.5|
|    g1|        1|      4.1|
|    g2|        1|      1.3|
|    g2|        2|      1.8|
|    g2|        1|      4.3|
|    g2|        1|      2.9|
+------+---------+---------+



In [32]:
data = [('user1', 'm1','3'), ('user1', 'm1', '1'), ('user1', 'm2', '5'), ('user2', 'm1', '4')]

columns = ['userId', 'movieId', 'rating']

df = spark.createDataFrame(data=data, schema=columns)

df.show(truncate=False)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|user1 |m1     |3     |
|user1 |m1     |1     |
|user1 |m2     |5     |
|user2 |m1     |4     |
+------+-------+------+



In [33]:
df_groupby = df.groupBy('movieID').agg(count('userID'))

In [34]:
df_groupby.show()

+-------+-------------+
|movieID|count(userID)|
+-------+-------------+
|     m1|            3|
|     m2|            1|
+-------+-------------+



c. The goal is to find the number of unique 
movies rated by each user. Write a complete 
PySpark program (as a set of trasformations 
and actions) to accomplish this task. Your 
output will be

<userID> <number-of-unique-movies>
    
#output
 
    User 1 - 2
    User 2 - 1

In [35]:
df.groupBy('userID').agg(f.countDistinct('movieID')).show()

+------+--------------+
|userID|count(movieID)|
+------+--------------+
| user1|             2|
| user2|             1|
+------+--------------+



In [36]:
data = [("1", "fulltime"), ("2", "parttime"), ("3", "Contractor"), ("4", "Contractor"), ("5", "fulltime"), ("6", "Contractor"), 
        ("7", "fulltime"), ("8", "parttime"), ("9", "parttime"), ("10", "fulltime"), ("1", "parttime"), ("3", "parttime"),
         ("7", "parttime"), ("8", "Contractor")]

columns = ["EmployeeID", "type"]

df = spark.createDataFrame(data=data, schema=columns)

df.show()

+----------+----------+
|EmployeeID|      type|
+----------+----------+
|         1|  fulltime|
|         2|  parttime|
|         3|Contractor|
|         4|Contractor|
|         5|  fulltime|
|         6|Contractor|
|         7|  fulltime|
|         8|  parttime|
|         9|  parttime|
|        10|  fulltime|
|         1|  parttime|
|         3|  parttime|
|         7|  parttime|
|         8|Contractor|
+----------+----------+



In [37]:
df2 = df.dropDuplicates(["EmployeeID"])

In [38]:
df2.sort('EmployeeID').show()

+----------+----------+
|EmployeeID|      type|
+----------+----------+
|         1|  fulltime|
|        10|  fulltime|
|         2|  parttime|
|         3|Contractor|
|         4|Contractor|
|         5|  fulltime|
|         6|Contractor|
|         7|  fulltime|
|         8|  parttime|
|         9|  parttime|
+----------+----------+



In [39]:
df2.groupBy('type').agg(count('*'). alias("Total")).show()

+----------+-----+
|      type|Total|
+----------+-----+
|Contractor|    3|
|  parttime|    3|
|  fulltime|    4|
+----------+-----+



In [40]:
df = spark.read.text("/Users/C940/spark3/movies.txt")

In [42]:
df.show()

+--------------+
|         value|
+--------------+
|user1,movie1,3|
|user1,movie1,1|
|user1,movie2,5|
|user2,movie1,4|
+--------------+



In [50]:
df1 = df.withColumn('userID', split(df['value'], ',').getItem(0))\
                   .withColumn('movieID', split(df['value'], ',').getItem(1))\
                    .withColumn('Rating', split(df['value'], ',').getItem(2)).drop('value')
                   

In [51]:
df1.show()

+------+-------+------+
|userID|movieID|Rating|
+------+-------+------+
| user1| movie1|     3|
| user1| movie1|     1|
| user1| movie2|     5|
| user2| movie1|     4|
+------+-------+------+



In [52]:
df_groupby = df1.groupBy('movieID').agg(count('userID'))

In [54]:
df_groupby.show()

+-------+-------------+
|movieID|count(userID)|
+-------+-------------+
| movie2|            1|
| movie1|            3|
+-------+-------------+



In [55]:
df1.groupBy('userID').agg(f.countDistinct('movieID')).show()

+------+--------------+
|userID|count(movieID)|
+------+--------------+
| user1|             2|
| user2|             1|
+------+--------------+



In [56]:
df = spark.read.text("/Users/C940/spark3/movie.txt")

In [57]:
df.show()

+--------+
|   value|
+--------+
|movie1,3|
|movie1,1|
|movie1,5|
|movie2,5|
|movie2,4|
|movie2,3|
+--------+



In [58]:
df2 = df.withColumn('movieID', split(df['value'], ',').getItem(0))\
                   .withColumn('Rating', split(df['value'], ',').getItem(1))\
                    .drop('value')

In [59]:
df2.show()

+-------+------+
|movieID|Rating|
+-------+------+
| movie1|     3|
| movie1|     1|
| movie1|     5|
| movie2|     5|
| movie2|     4|
| movie2|     3|
+-------+------+



In [61]:
df2.groupBy('movieID').agg(count('Rating').alias('count of raters')).show()

+-------+---------------+
|movieID|count of raters|
+-------+---------------+
| movie2|              3|
| movie1|              3|
+-------+---------------+



In [63]:
df2.groupBy('movieID').agg({'Rating': 'mean'}).show()

+-------+-----------+
|movieID|avg(Rating)|
+-------+-----------+
| movie2|        4.0|
| movie1|        3.0|
+-------+-----------+



In [110]:
df = spark.read.text("/Users/C940/spark3/geneval.txt")

In [111]:
df.show()

+---------+
|    value|
+---------+
| 1,g1,2.3|
| 2,g1,1.5|
| 3,g1,2.5|
|3,g1,-2.4|
| 1,g1,4.1|
|1,g1,-2.0|
| 1,g2,1.3|
| 2,g2,1.8|
| 3,g2,3.5|
|1,g2,-0.5|
| 1,g2,4.3|
| 1,g2,2.9|
|1,g2,-3.0|
+---------+



In [115]:
df3 = df.withColumn('recordNumber', split(df['value'], ',').getItem(0))\
                   .withColumn('geneID', split(df['value'], ',').getItem(1))\
                    .withColumn('geneValue', split(df['value'], ',').getItem(2)).drop('value')

In [116]:
df3.show()

+------------+------+---------+
|recordNumber|geneID|geneValue|
+------------+------+---------+
|           1|    g1|      2.3|
|           2|    g1|      1.5|
|           3|    g1|      2.5|
|           3|    g1|     -2.4|
|           1|    g1|      4.1|
|           1|    g1|     -2.0|
|           1|    g2|      1.3|
|           2|    g2|      1.8|
|           3|    g2|      3.5|
|           1|    g2|     -0.5|
|           1|    g2|      4.3|
|           1|    g2|      2.9|
|           1|    g2|     -3.0|
+------------+------+---------+



In [117]:
df3.groupBy("geneID").agg({'geneValue': 'mean'}).show()

+------+------------------+
|geneID|    avg(geneValue)|
+------+------------------+
|    g2|1.4714285714285713|
|    g1|               1.0|
+------+------------------+



In [118]:
df3.groupBy("geneID").agg(min("geneValue"), max("geneValue")).show()

+------+--------------+--------------+
|geneID|min(geneValue)|max(geneValue)|
+------+--------------+--------------+
|    g1|          -2.0|           4.1|
|    g2|          -0.5|           4.3|
+------+--------------+--------------+



In [123]:
df3.groupBy('geneID').agg({'geneValue': 'min'}).show()

+------+--------------+
|geneID|min(geneValue)|
+------+--------------+
|    g1|          -2.0|
|    g2|          -0.5|
+------+--------------+



In [125]:
df3.filter(df3.geneValue > 0).show()

+------------+------+---------+
|recordNumber|geneID|geneValue|
+------------+------+---------+
|           1|    g1|      2.3|
|           2|    g1|      1.5|
|           3|    g1|      2.5|
|           1|    g1|      4.1|
|           1|    g2|      1.3|
|           2|    g2|      1.8|
|           3|    g2|      3.5|
|           1|    g2|      4.3|
|           1|    g2|      2.9|
+------------+------+---------+



In [126]:
df = spark.read.text("/Users/C940/spark3/geneval.txt")

In [127]:
df4 = df.withColumn('recordNumber', split(df['value'], ',').getItem(0))\
                   .withColumn('geneID', split(df['value'], ',').getItem(1))\
                    .withColumn('geneValue', split(df['value'], ',').getItem(2)).drop('value')

In [130]:
df4.drop('recordNumber').show()

+------+---------+
|geneID|geneValue|
+------+---------+
|    g1|      2.3|
|    g1|      1.5|
|    g1|      2.5|
|    g1|     -2.4|
|    g1|      4.1|
|    g1|     -2.0|
|    g2|      1.3|
|    g2|      1.8|
|    g2|      3.5|
|    g2|     -0.5|
|    g2|      4.3|
|    g2|      2.9|
|    g2|     -3.0|
+------+---------+



In [151]:
df4.groupBy('geneID').agg(f.percentile_approx("geneValue", 0.5, f.lit(1000000))).show()

+------+------------------------------------------+
|geneID|percentile_approx(geneValue, 0.5, 1000000)|
+------+------------------------------------------+
|    g2|                                       1.8|
|    g1|                                       1.5|
+------+------------------------------------------+



In [171]:
def pos(geneValue): 
    return geneValue[geneValue > 0]
  
def neg(geneValue): 
    return geneValue[geneValue < 0]