# Prepare Environment

##a. Install pyspark package
The first step before running spark is to install the ``pyspark`` package, because the package is not included in the default Google Colab setup.
This step needs to be done every time we open a new session

In [None]:
!pip install pyspark


##b. Create spark session

In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('DataFrame Basics').getOrCreate()

# Explore Data

Twitter user dataset

| Field | Remarks |
| ----- | ------- |
| account_id | Unique account ID |
| handle | Twitter handle, e.g., [@JonBruner](https://twitter.com/jonbruner) |
| name | Full account name, e.g., "Jon Bruner" |
| description | Long account description, e.g., "Person who loves revelatory datasets that speak to eternal social characteristics." |
| url | URL associated with account description |
| language | [ISO language code](https://www.loc.gov/standards/iso639-2/php/code_list.php) selected by user |
| location | Free-text location provided by user |
| account_created_at | Date and time of account creation |
| account_created_at_interpolated | See below |
| crawled_at | Date and time when account was scraped for inclusion in this dataset |
| missing | 0: account exists<br>1: account never existed with this ID<br>2: account closed (by user, or by Twitter for abuse) |
| protected | 0: not protected<br>1: protected |
| followers_count | Number of Twitter accounts following this account |
| following_count | Number of Twitter accounts that this account follows |
| statuses_count | Number of public posts created by this account |
| listed_count | Number of lists on which this account appears |
| last_post_id | Unique ID of this account's most recent post, as of `crawled_at` |
| last_post_text | Text of this account's most recent post |
| last_post_lat | Latitude, if available, for this account's most recent post |
| last_post_lon | Longitude, if available, for this account's most recent post |
| last_post_place_id | [Twitter place ID](https://dev.twitter.com/overview/api/places), if available, for this account's most recent post |
| last_post_created_at | Date and time of this account's most recent post |
| time_since_last_post | Convenience variable: the difference in hours between `crawled_at` and `last_post_created_at` |

In [None]:
!wget https://www.dropbox.com/s/y83nb7kawnnfqky/twitter-user-5k.csv

In [None]:
! head twitter-user-5k.csv

In [None]:
df_twitter_user = spark.read.csv("twitter-user-5k.csv",header=True,inferSchema=True)

###2.1 Taking a peek at data

To see some of the records from dataframe, we can use ``show(n)`` to show the first n

In [None]:
df_twitter_user.show(5)

Showing Schema



In [None]:
df_twitter_user.printSchema()

Showing statistics summary

We can show the summary statistic of numeric columns with ``describe``. If we don't specify the column name, it will calculate and show all column's summary.

In [None]:
df_twitter_user.describe().show()

Summary statistics for selected column

In [None]:
df_twitter_user.describe("account_id").show()

In [None]:
df_twitter_user.count()

###2.2 Filtering



####Select columns

In [None]:
df_twitter_user.select("handle").show()

Select multiple column

In [None]:
df_twitter_user.select("account_id","handle", "name").show()

####Select records

In [None]:
df_twitter_user.filter(df_twitter_user.account_id > 50000).show()

String condition

In [None]:
df_twitter_user.filter(df_twitter_user.name.contains('John')).show()

In [None]:
df_twitter_user.filter(df_twitter_user.name.startswith('J')).show()

In [None]:
df_twitter_user.filter(df_twitter_user.name.endswith('r')).show()

In [None]:
df_twitter_user.filter( df_twitter_user.name.isNotNull()).show()

Filter with multiple condition

In [None]:
df_twitter_user.filter((df_twitter_user.language.isin('id','en'))\
                        & (df_twitter_user.followers_count > 100)).show()

In [None]:
from pyspark.sql.functions import col
df_twitter_user.filter( "CAST(account_id AS INT) IS NOT NULL"  )\
               .show(5)

In [None]:
df_user = df_twitter_user.filter( "CAST(account_id AS INT) IS NOT NULL"  )

In [None]:
df_user.show(5)

###2.3 Aggregation

Number of columns

In [None]:
len(df_user.columns)

Total records count

In [None]:
df_user.count()

####Distinct value

In [None]:
df_user.distinct().count()

Distinct value of selected column

In [None]:
df_user.select('language').distinct().show()

In [None]:
df_user.select('language', 'missing').distinct().show()

####Grouping by column

In [None]:
df_user.groupBy("language").count().show()
#df_twitter_user.groupBy("timezone").max('density').show()

In [None]:
df_user.groupBy("language", 'missing').count().show()

Another way to compute mean min and max

In [None]:
from pyspark.sql.functions import col, sum, avg, max, min, count

df_user.groupBy("language") \
       .agg(count("*").alias("num_rec"), \
            max("missing").alias("max_missing"), \
            min("missing").alias("min_missing"), \
            ) \
       .show()

####More aggregation options

Order By

In [None]:
df_user.groupBy("language") \
       .agg(count("*").alias("num_rec"), \
            max("missing").alias("max_missing"), \
            min("missing").alias("min_missing"), \
            ) \
       .orderBy("num_rec", ascending=False) \
       .show()

Aggregation with condition

In [None]:
df_user.groupBy("language") \
       .agg(count("*").alias("num_rec"), \
            max("missing").alias("max_missing"), \
            min("missing").alias("min_missing"), \
            ) \
       .where(col("num_rec") > 10) \
       .show()