# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark DataFrames
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful as is [this reference on doing joins in Spark dataframe](http://www.learnbymarketing.com/1100/pyspark-joins-by-example/).

The [DataBricks company has one of the better reference manuals for PySpark](https://docs.databricks.com/spark/latest/dataframes-datasets/index.html) -- they show you how to perform numerous common data operations such as joins, aggregation operations following `groupBy` and the like.

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

The following aggregation functions may be useful -- [these can be used to aggregate results of `groupby` operations](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html#example-aggregations-using-agg-and-countdistinct). More documentation is at the [PySpark SQL Functions manual](https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#module-pyspark.sql.functions). Feel free to use other functions from that library.

In [3]:
from pyspark.sql.functions import col, count, countDistinct

Create our session as described in the tutorials

In [4]:
spark = SparkSession \
    .builder \
    .appName("Lab4-Dataframe") \
    .master("local[*]")\
    .getOrCreate()

'\nWhat is happening here? We instantiating a spark session. What does that mean?\nSpark Session can generate dataframes\n\nWhat is builder?\n\nMaster is telling YARN to use all available local cores, right?\n\ngetOrCreate checks if [*] is existing and retrieves it or creates it appropriately?\n\n~~~~\n\nSparkConf: Probably the one that generates the RDD\n\n'

Read in the citations and patents data and check that the data makes sense. Note that unlike in the RDD solution, the data is automatically inferred to be Integer() types.

In [None]:
# Read in the citations gzip file
# Citations is now a pyspark.sql.dataframe.DataFrame
citations = spark.read.load('cite75_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

# Read in the patent gzip file
# Patents is now a pyspark.sql.dataframe.DataFrame
patents = spark.read.load('apat63_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

citations.write.parquet("citations.parquet")
patents.write.parquet("patents.parquet")

# Length of patRdd = 2_923_922
# Length of citRdd = 16_522_438

# Start Here

In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct

# Start a Spark Session
spark = SparkSession \
    .builder \
    .appName("Lab4-Dataframe") \
    .master("local[*]")\
    .getOrCreate()

# Read in the data from parquet
citations = spark.read.parquet("citations.parquet")
patents = spark.read.parquet("patents.parquet")

# Confirm data was read in correctly
print("Citations:")
citations.show(5)
print()
print("Patents:")
patents.show(5)

Citations:
+-------+-------+
| CITING|  CITED|
+-------+-------+
|3858241| 956203|
|3858241|1324234|
|3858241|3398406|
|3858241|3557384|
|3858241|3634889|
+-------+-------+
only showing top 5 rows


Patents:
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|3070801| 1963| 1096|   null|     BE|   null|    null|      1|  null|   269|  6|    69| null|       1|    null|    0.0|    null|    null|    null|    null|    null|    null|    null|
|3070802| 1963| 1096|   null|     US|     TX|    null|      

## Get the Patent-State Information

In [24]:
# Get the patent - state information & drop all null values
states = patents.select(['PATENT','POSTATE']).na.drop()
print("States:")
states.show(5)

States:
+-------+-------+
| PATENT|POSTATE|
+-------+-------+
|3070802|     TX|
|3070803|     IL|
|3070804|     OH|
|3070805|     CA|
|3070806|     PA|
+-------+-------+
only showing top 5 rows



## Join the Patent-State Table with the Cited-Citing Table (x2)

In [19]:
citationResults = citations.join(states, citations.CITING == states.PATENT, 'left') \
                  .drop('PATENT').withColumnRenamed('POSTATE', 'CITING STATE') \
                  .join(states, citations.CITED == states.PATENT, 'left') \
                  .drop('PATENT').withColumnRenamed('POSTATE', 'CITED STATE')

# Reduce to columns that are costate citations
citationResults = citationResults[citationResults["CITING STATE"] == citationResults["CITED STATE"]] \
                  .groupBy('CITING').count().sort("CITING", ascending=False)


citationResults.show(10)

+-------+-----+
| CITING|count|
+-------+-----+
|6009554|    8|
|6009551|    2|
|6009550|    4|
|6009549|    4|
|6009543|    1|
|6009541|   43|
|6009540|    4|
|6009539|    3|
|6009538|    1|
|6009536|    1|
+-------+-----+
only showing top 10 rows



## Append to the original Patent Information

In [34]:
# Append to the original list
patTemp = patents.join(citationResults, patents['PATENT'] == citationResults['CITING'], 'left') \
          .drop('CITING').withColumnRenamed('count', '# COSTATE CITATIONS').fillna({'# COSTATE CITATIONS': '0'})
          
patTemp.sort('# COSTATE CITATIONS', ascending=False).show(10)

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------------------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|# COSTATE CITATIONS|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------------------+
|5959466| 1999|14515|   1997|     US|     CA|    5310|      2|  null|   326|  4|    46|  159|       0|     1.0|   null|  0.6186|    null|  4.8868|  0.0455|   0.044|    null|    null|                125|
|5983822| 1999|14564|   1998|     US|     TX|  569900|      2|  null|   114|  5|    55|  200|       0|   0.995|   null|  0.7201|    null|   12.45|     0.0|     0.0|    null|    null|      

***
# Steps for Lab
1. [Citing #] [Cited #] => [Citing #] [Citing State] [Cited #]
2. [Citing #] [Citing State] [Cited #] => [Citing #] [Citing State] [Cited #] [Cited State]
3. Filter to only include lines with identical [Citing State] and [Cited State]
4. Count number lines per [Citing #] => Append to [Citing #] Info.
5. Return full list


# Actions for RDD objects

Some of the most simple actions are:
* count() - Return the number of items in the RDD
* take(n) - Extract and return the first n items from the RDD. Returns a python List[pyspark.sql.types.Row]
* first() - Same as take(1)
* collect() - Same as take(count()) - **returns full RDD**
* takeSample(_withReplacement_:Boolean, _num_:int, [ seed:Int] ) - extract a random set of _num_ items from the RDD with or without replacement.
* takeOrdered( _num_ ) - extract _num_ items from the sorted RDD.


# Actions for DataFrame Objects
* select(['ColumnName']) - returns a new DF object with just the requested columns
* show(n=20) - Show the first n items of the dataframe
* count() - Return the number of items in the DataFrame
* columns - Return a list of column names
* dtypes - Return a list of tuples of form [(Column_Name, Data_Type)]
* toDF(['new', 'column', 'names']) - Returns a new DF object. Input: list of new column names
* withColumnRenamed('old', 'new') - Returns a new DF object. Inputs (2): Old column name to be replaced, New column name. Can only replace one column name at a time
* drop('colName') - Returns a new DF object with the specified column dropped. Unclear if you cna put in multiple column names as a list
* df[df.colName < value] - Returns a new DF object that only has rows with the corresponding criteria - SINGLE DROP
* df[(df.colName1 < value1) & (df.colName1 < value1)] - Same as above - MULTIPLE DROP
* df.withColumn('gpm', 1 / df.mpg) - Returns a new DF object with first agr as the col name and 2nd arg as the operation. Divide by 0 returns null
* df.fillna(val) returns a new df with

# General Notes

Terminology:

Slices = Partitions

MAIN SPARK ABSTRACTIONS: <br>
RDD: Distributed collection of objects <br>
DataFrame: Distributed dataset of tabular data (integrated SQL, ML Algorithms)

Important Concepts: <br>
Immutability <br>
Lazy computing

/////////

RDD operations:

Length of rDD = rdd.count()

/////////

conf = <class 'pyspark.conf.SparkConf'>
sc = <class 'pyspark.context.SparkContext'>
sc.parallelize is a thing
