Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import pyspark.sql.functions as f

Loading Data

In [0]:
spark = SparkSession.builder.appName('Spark DataFrames').getOrCreate()

crime = spark.read.csv(path='dbfs:/FileStore/crime.csv',header=True,inferSchema=True)
schools = spark.read.csv(path='dbfs:/FileStore/schools.csv',header=True,inferSchema=True)
rankings = spark.read.csv(path='dbfs:/FileStore/rankings.csv',header=True,inferSchema=True)

Load a subset of columns
<br>
The following shows:
- a sub-selection of the college dataframe
- some column headings are renamed

In [0]:
display(college.head(1))

Institution_Name,City,State,City State,Zip,URL,Main_Campus,Predominant_Ugrad_Deg,Highest_Deg,Control,Locale,Lat,Long,Religious_Affiliation,Adm_Rate,SAT_R_75,SAT_M_75,SAT_W_75,ACT_CUM_75,Undergrad_Enrollment,Percent_White,Percent_Black,Percent_Hisp,Percent_Asian,Percent_AIAN,Percent_NHPI,Percent_2OrMore,Percent_NRA,Percent_UNKN,Percent_Part-time,Avg_Cost_Academic_Year,Avg_Cost_Program_Year,In-state_Tuition,Out-of-state_Tuition,Avg_Fac_Sal,Percent_Full-time_Fac,CompletionRate_150_4,CompletionRate_150_L4,RetentionRate_FT4,RetentionRate_FTL4,RetentionRate_PT4,RetentionRate_PTL4,Compl_Repay_1yr_Rate,Noncom_Repay_1yr_Rate,Compl_Repay_7yr_Rate,Noncom_Repay_7yr_Rate,Low_Inc_Aid,Parent_Ed_MS,Parent_Ed_HS,Parent_Ed_PS,Percent_Female,Percent_Male,Percent_Veterans,Percent_First_Gen,Level of institution,TIV_Approval_Date,Top3Majors
Alabama A & M University,Normal,AL,Normal AL,35762,www.aamu.edu/,Yes,Predominantly bachelor's-degree granting,Graduate degree,Public,"City: Midsize (population of at least 100,000 but less than 250,000)",34.783368,-86.568502,,0.8738,470,470,457,19,4616,0.0256,0.9129,0.0076,0.0019,0.0024,0.0017,0.0401,0.0065,0.0013,0.0877,22667,,9366,17496,7028,0.7354,0.2749,,0.5769,,0.3091,,0.336676218,0.111891892,0.563870968,0.286221591,0.602008788,0.016422083,0.34940601,0.634171908,0.564030132,0.009102323,0.003138732,0.365828092,4-year,12/12/1965,"Computer Engineering, Aviation, Neuroscience"


In [0]:
collegedf = (
    spark
    .read.csv(path='dbfs:/FileStore/college.csv',
              header=True,
              inferSchema=True
             )
    .select(
        f.col('Institution_Name').alias('college'), 
        f.col('SAT_R_75'), 
        f.col('State').alias('state'),
        'Undergrad_Enrollment', 
        f.col('Avg_Cost_Academic_Year').alias('cost_per_year'),
        'In-state_Tuition',
        'Out-of-state_Tuition',
        f.col('RetentionRate_FTL4').alias('retentionRate'),
        f.col('Level of institution').alias('level'),
        'Top3Majors'
    )
)

In [0]:
collegedf.show(1)

+--------------------+--------+--------------------+-------------+----------------+--------------------+-------------+------+--------------------+
|             college|SAT_R_75|Undergrad_Enrollment|cost_per_year|In-state_Tuition|Out-of-state_Tuition|retentionRate| level|          Top3Majors|
+--------------------+--------+--------------------+-------------+----------------+--------------------+-------------+------+--------------------+
|Alabama A & M Uni...|     470|                4616|        22667|            9366|               17496|         NULL|4-year|Computer Engineer...|
+--------------------+--------+--------------------+-------------+----------------+--------------------+-------------+------+--------------------+
only showing top 1 row



**Note**: If line numbers are needed...<br>
Click on the left margin of a Cmd block and hit **_L_** on your keyboard

Load Data With Schema that isn't _inferred_

In [0]:
schema = StructType([
    StructField('name',StringType(), True),
    StructField('location',StringType(), True),
    StructField('rank',StringType(), True),
    StructField('description',StringType(), True),
    StructField('tuition_and_fees',DoubleType(), True),
    StructField('in-state',StringType(), True),
    StructField('undergraduate_enrollment',StringType(), True)    
])

rankings_coerce_types = (spark
                         .read
                         .format('csv')
                         .options(
                             schema=schema,
                             header=True
                             )
                         .load(path='dbfs:/FileStore/rankings.csv')
                        )

In [0]:
rankings_coerce_types.show(1)

+--------------------+-------------+----+--------------------+--------------------+--------+--------------------+
|                Name|     Location|Rank|         Description|    Tuition and fees|In-state|Undergrad Enrollment|
+--------------------+-------------+----+--------------------+--------------------+--------+--------------------+
|Princeton University|Princeton, NJ|   1|"Princeton, the f...|"" speaks to the ...|$45,320 |                null|
+--------------------+-------------+----+--------------------+--------------------+--------+--------------------+
only showing top 1 row



## Slice and Dice

In [0]:
collegedf.show(10)

+--------------------+--------+-----+--------------------+-------------+----------------+--------------------+-------------+------+--------------------+
|             college|SAT_R_75|state|Undergrad_Enrollment|cost_per_year|In-state_Tuition|Out-of-state_Tuition|retentionRate| level|          Top3Majors|
+--------------------+--------+-----+--------------------+-------------+----------------+--------------------+-------------+------+--------------------+
|Alabama A & M Uni...|     470|   AL|                4616|        22667|            9366|               17496|         NULL|4-year|Computer Engineer...|
|University of Ala...|     640|   AL|               12047|        22684|            8040|               18368|         NULL|4-year|Medieval and Rena...|
|  Amridge University|    NULL|   AL|                 293|        13380|            7180|                7180|         NULL|4-year|Journalism, Engli...|
|University of Ala...|     660|   AL|                6346|        22059|          

Learn about the data

In [0]:
# how many versions of level are there?
collegedf.select('level').distinct().collect()

Out[5]: [Row(level='2-year'), Row(level='4-year')]

In [0]:
# how many colleges are we considering?
collegedf.count()

Out[33]: 2662

### What Is The Most Common Major Offered?

First, let's split out the top 3 majors by parsing the Top3Majors column

In [0]:
collegedf2 =   collegedf.withColumn('major0', f.split(collegedf['Top3Majors'], ',').getItem(0)) \
                        .withColumn('major1', f.split(collegedf['Top3Majors'], ',').getItem(1)) \
                        .withColumn('major2', f.split(collegedf['Top3Majors'], ',').getItem(2)) \
                        .drop(f.col('Top3Majors'))
display(collegedf2.head(2))

college,SAT_R_75,state,Undergrad_Enrollment,cost_per_year,In-state_Tuition,Out-of-state_Tuition,retentionRate,level,major0,major1,major2
Alabama A & M University,470,AL,4616,22667,9366,17496,,4-year,Computer Engineering,Aviation,Neuroscience
University of Alabama at Birmingham,640,AL,12047,22684,8040,18368,,4-year,Medieval and Renaissance Studies,Computer and Information Science,Nursing


Next, let's combine the 3 majors' columns and stack them as if we are transposing the data

In [0]:
majors = ('major0', 'major1','major2')

collegeTbl = (
    collegedf2
        .withColumn('major',
            f.explode(f.array('major0', 'major1','major2'))
           )
        .drop(*majors)
    )

display(collegeTbl.head(5))

college,SAT_R_75,state,Undergrad_Enrollment,cost_per_year,In-state_Tuition,Out-of-state_Tuition,retentionRate,level,major
Alabama A & M University,470,AL,4616,22667,9366,17496,,4-year,Computer Engineering
Alabama A & M University,470,AL,4616,22667,9366,17496,,4-year,Aviation
Alabama A & M University,470,AL,4616,22667,9366,17496,,4-year,Neuroscience
University of Alabama at Birmingham,640,AL,12047,22684,8040,18368,,4-year,Medieval and Renaissance Studies
University of Alabama at Birmingham,640,AL,12047,22684,8040,18368,,4-year,Computer and Information Science


## Aggregating

In [0]:
# this user definited function (udf) will be used to simplify later code

# display( ...head(n)) can also be written like ...show(n)
# display is the html version of the table, show is the text version

# Top n items udf
def showTop(df, field, n = 10):
    display(df
     .groupby(field).count() 
     .orderBy( ['count'], ascending=False).head(n)
    )

Top 5 Majors

In [0]:
showTop(collegeTbl, 'major', 5)

major,count
Applied Mathematics,242
Nursing,237
Biology,211
English,182
Physics,95


Is there a relationship between State, SAT scores, and cost?

In [0]:
collegeTbl.show(3)

+--------------------+--------+--------------------+-------------+----------------+--------------------+-------------+------+--------------------+
|             college|SAT_R_75|Undergrad_Enrollment|cost_per_year|In-state_Tuition|Out-of-state_Tuition|retentionRate| level|               major|
+--------------------+--------+--------------------+-------------+----------------+--------------------+-------------+------+--------------------+
|Alabama A & M Uni...|     470|                4616|        22667|            9366|               17496|         NULL|4-year|Computer Engineering|
|Alabama A & M Uni...|     470|                4616|        22667|            9366|               17496|         NULL|4-year|            Aviation|
|Alabama A & M Uni...|     470|                4616|        22667|            9366|               17496|         NULL|4-year|        Neuroscience|
+--------------------+--------+--------------------+-------------+----------------+--------------------+-------------+

In [0]:
# show data types of each column
# this comes in handy when applying operations on fields that may be set to strings, even though they are expected to be numeric
collegedf.dtypes

Out[139]: [('college', 'string'),
 ('SAT_R_75', 'string'),
 ('state', 'string'),
 ('Undergrad_Enrollment', 'int'),
 ('cost_per_year', 'string'),
 ('In-state_Tuition', 'int'),
 ('Out-of-state_Tuition', 'int'),
 ('retentionRate', 'string'),
 ('level', 'string'),
 ('Top3Majors', 'string')]

In the data types above, we see that SAT_R_75 and cost_per_year are set to string values<br>

* However, we need them to be float values.<br>
* We then need to get averages by state.<br>
* And finally, we should rename the columns.<br><br>

#### We will need to apply the following:
* ___cast___ them as float
* since we are aggregating by multiple columns, which are being renamed, or ___aliased___, we will need to apply the agg() function

In [0]:
scoresdf = ( 
    collegeTbl
        .select( 'state', 
                f.col('SAT_R_75').cast('float'), 
                f.col('cost_per_year').cast('float')
               )
)

# get means
meansdf = ( scoresdf.groupBy('state')
    .agg(
        f.mean('SAT_R_75').alias('Avg_SAT'),
        f.mean('cost_per_year').alias('Avg_AnnualCost')
    )

)

meansdf.show(5)


+-----+-----------------+------------------+
|state|          Avg_SAT|    Avg_AnnualCost|
+-----+-----------------+------------------+
|   AZ|            596.0|20540.470588235294|
|   SC|548.0645161290323| 25876.01818181818|
|   LA|573.3333333333334|22802.757575757576|
|   MN| 588.551724137931| 26549.66176470588|
|   NJ|           561.92| 28679.43396226415|
+-----+-----------------+------------------+
only showing top 5 rows



#### Sorting
Sorting can be done by using __sort()__ or __orderby()__<br>

When applying sort to a single column, the syntax would look like: <br>
`df.sort('by_col', ascending=True)`

However when applying sort to multiple columns, brackets are required.  The syntax for both columns to be __ascending__ would be:<br>
`df.sort(['by_col1', 'by_col2', ascending = True)`<br><br>
If 1 column was ascending and 1 was descending, the syntax would be respectively:<br>
`df.sort(['by_col1', 'by_col2', ascending = [True, False])`

In [0]:
meansdf.sort(['state', 'Avg_SAT'], ascending=[True, False]).show(10)

+-----+-----------------+------------------+
|state|          Avg_SAT|    Avg_AnnualCost|
+-----+-----------------+------------------+
|   AK|            600.0|           20526.5|
|   AL|561.8333333333334| 22017.02380952381|
|   AR|          558.125| 19841.39024390244|
|   AZ|            596.0|20540.470588235294|
|   CA|602.1971830985915|28608.758293838862|
|   CO|590.7142857142857| 25351.61111111111|
|   CT|566.0714285714286| 30130.28205128205|
|   DC|            661.0|           43986.7|
|   DE|526.6666666666666|29828.333333333332|
|   FL|577.5675675675676|30335.214285714286|
+-----+-----------------+------------------+
only showing top 10 rows



#### Rounding and Decimal Formatting

In [0]:
meansdf2 = ( 
    meansdf
     .select(
         f.col('state').alias('State'), 
         f.round('Avg_SAT', 0).alias('SAT'), 
         f.col('Avg_AnnualCost').cast('decimal(12,2)').alias('Annual_Cost')
         #f.format_number('Avg_AnnualCost', 2).alias('Annual_Cost')  # This formats with commas, and looks nice in report, but ultimately
                                                                     #   converts the column into string values, which leads to more work in the end **
     )
      .sort(['state', 'Avg_SAT'], ascending=[True, False])
)

meansdf2.show(2)

+-----+-----+-----------+
|State|  SAT|Annual_Cost|
+-----+-----+-----------+
|   AK|600.0|   20526.50|
|   AL|562.0|   22017.02|
+-----+-----+-----------+
only showing top 2 rows



#### Applying Operations on Columns

In [0]:
meansdf2.dtypes

Out[123]: [('State', 'string'), ('SAT', 'double'), ('Annual_Cost', 'decimal(12,2)')]

In [0]:
( 
    meansdf2
     .withColumn(
         'cost_per_SAT_point', 
         (meansdf2['Annual_Cost'] / meansdf2['SAT'])
             .cast('Decimal(12,2)') 
     ).sort('cost_per_SAT_point', ascending=False)
).show(10)

+-----+-----+-----------+------------------+
|State|  SAT|Annual_Cost|cost_per_SAT_point|
+-----+-----+-----------+------------------+
|   VT|591.0|   42832.06|             72.47|
|   RI|630.0|   42341.91|             67.21|
|   MA|616.0|   41232.01|             66.94|
|   DC|661.0|   43986.70|             66.55|
|   PA|565.0|   34927.78|             61.82|
|   IN|571.0|   33421.83|             58.53|
|   DE|527.0|   29828.33|             56.60|
|   NY|597.0|   33705.45|             56.46|
|   NH|610.0|   32651.18|             53.53|
|   CT|566.0|   30130.28|             53.23|
+-----+-----+-----------+------------------+
only showing top 10 rows



In [0]:
 .cast('Decimal(12,2)')
            .sort('ratio', ascending=False)