<a href="https://colab.research.google.com/github/stbalaji/sparkpublic/blob/main/sparkClusterSubmit02.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# download spark3.0.0
!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

In [None]:
# unzip it
!tar xf spark-3.0.1-bin-hadoop3.2.tgz

In [None]:
# install findspark 
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [None]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.0.1


In [None]:
!ls -la

total 218832
drwxr-xr-x  1 root root      4096 Feb  5 17:08 .
drwxr-xr-x  1 root root      4096 Feb  5 17:00 ..
drwxr-xr-x  1 root root      4096 Feb  4 15:26 .config
drwxr-xr-x  1 root root      4096 Feb  4 15:26 sample_data
drwxr-xr-x 13 1000 1000      4096 Aug 28 09:22 spark-3.0.1-bin-hadoop3.2
-rw-r--r--  1 root root 224062525 Aug 28 09:25 spark-3.0.1-bin-hadoop3.2.tgz


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
spark

In [None]:
print(os.listdir('./sample_data'))

['README.md', 'anscombe.json', 'california_housing_train.csv', 'california_housing_test.csv', 'mnist_train_small.csv', 'mnist_test.csv']


In [None]:
#print(os.listdir('./sample_data'))
file_loc = "./sample_data/california_housing_train.csv"
df_spark = spark.read.csv(file_loc, inferSchema=True, header=True)
print(type(df_spark))

<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
df_spark.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [None]:
!wget -q  http://www.grouplens.org/system/files/ml-100k.zip

In [None]:
!unzip ml-100k.zip

In [None]:
!pwd

/content


In [None]:
from pyspark.sql.types import StringType, IntegerType, TimestampType, StructType, StructField
  # cols = ['user_id', 'item_id', 'rating','timestamp']
schema = StructType([StructField('userid', IntegerType(), True),
                     StructField('movieId', IntegerType(), True),
                     StructField('rating', IntegerType(), True),
                     StructField('timestamp', TimestampType(), True)])

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import SQLContext

import operator

if __name__ == "__main__":

    spark = SparkSession.builder.appName("LessonOnClusterComputingMovielens").getOrCreate()
    v_sc = spark.sparkContext

    # cols = ['user_id', 'item_id', 'rating','timestamp']
    filepath = "./ml-100k/"

    # Create a dataframe
    dataFrameReader = spark.read
    df_ratings = dataFrameReader.format("CSV").option("delimiter", "\t").option("header","false").schema(schema).load(filepath + "u.data")
    #df_movies = dataFrameReader.format("CSV").option("header","true").option("inferSchema","true").load(filepath + "movies.csv")

    print("=== Here is Schema of Ratings Data ===")
    df_ratings.printSchema()

    print("=== Total number of Ratings === : ", df_ratings.count())
    
    df_ratings_subset = df_ratings.select("userId", "movieId", "rating")  # Timestamp may not be needed for now
    print("=== Ratings with only Selected Columns === : ")
    df_ratings_subset.show()
    
    df_ratings_subset = df_ratings.select("userId", "movieId", "rating")  # Timestamp may not be needed for now
    print("=== Ratings with only Selected Columns === : ")
    df_ratings_subset.show()

    print("=== Print all the ratings for a particular Movie ===")
    df_ratings_subset.filter(df_ratings_subset.movieId == 100).show()

    #find unique values of a ratings
    print("=== Print all the ratings for a particular Movie ===")
    ratings_1 = df_ratings_subset.toPandas()['rating'].unique()
    ratings_1

    ratings_2 = df_ratings.select('rating').distinct().collect()
    ratings_2

    print("=== Print the count of ratings ===")
    groupedRatings = df_ratings.groupby('rating')
    groupedRatings.count().show()

    df_ratings_subset.groupby('rating').count().orderBy("rating").show()

    print("=== Print ratings by their counts in the descenting order ===")
    import pyspark.sql.functions as f
    groupedRatings.count().select('rating', f.col('count').alias('numOfRatings')).orderBy("numOfRatings", ascending=False).show()

    print("=== Print top 10 movies by number of ratings ===")
    groupedMovies = df_ratings.groupby('movieId')
    groupedMovies.count().select('movieId', f.col('count').alias('numOfRatings')).orderBy("numOfRatings", ascending=False).show(10)

    print("=== Exiting the code ===")

    spark.stop()

=== Here is Schema of Ratings Data ===
root
 |-- userid: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)

=== Total number of Ratings === :  100000
=== Ratings with only Selected Columns === : 
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   196|    242|     3|
|   186|    302|     3|
|    22|    377|     1|
|   244|     51|     2|
|   166|    346|     1|
|   298|    474|     4|
|   115|    265|     2|
|   253|    465|     5|
|   305|    451|     3|
|     6|     86|     3|
|    62|    257|     2|
|   286|   1014|     5|
|   200|    222|     5|
|   210|     40|     3|
|   224|     29|     3|
|   303|    785|     3|
|   122|    387|     5|
|   194|    274|     2|
|   291|   1042|     4|
|   234|   1184|     2|
+------+-------+------+
only showing top 20 rows

=== Ratings with only Selected Columns === : 
+------+-------+------+
|userId|movieId|rating|
+------+-

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2021-02-05 18:32:11--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.204.23.149, 34.193.233.154, 3.226.231.47, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.204.23.149|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13773305 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2021-02-05 18:32:12 (13.9 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13773305/13773305]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://13c5258afc6f.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":6,"gauge":0,"rate1":5.965349913224447e-8,"rate5":0.0011352891050691314,"rate15":0.002561786855325972,"p50":1793135.5,"p90":3203751,"p95":3203751,"p99":3

In [None]:
# Install useful stuff
! apt install --yes ssh screen nano htop ranger git > /dev/null# SSH setting
! echo "root:carbonara" | chpasswd
! echo "PasswordAuthentication yes" > /etc/ssh/sshd_config
! echo "PermitUserEnvironment yes" >> /etc/ssh/sshd_config
! echo "PermitRootLogin yes" >> /etc/ssh/sshd_config
! service ssh restart > /dev/null# Download ngrok
! wget -q -c -nc https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
! unzip -qq -n ngrok-stable-linux-amd64.zip# Run ngrok
authtoken = "1o4fP2v3NWaVtuedLz5FFjuVMDu_5CMQDiEfxBxKbgvfnw2vW"
get_ipython().system_raw('./ngrok authtoken $authtoken && ./ngrok tcp 22 &')
! sleep 3# Get the address for SSH
import requests
from re import sub
r = requests.get('http://localhost:4040/api/tunnels')
str_ssh = r.json()['tunnels'][0]['public_url']
str_ssh = sub("tcp://", "", str_ssh)
str_ssh = sub(":", " -p ", str_ssh)
str_ssh = "ssh root@" + str_ssh
print(str_ssh)



E: Unable to locate package SSH
E: Unable to locate package setting
ssh: unrecognized service
unzip:  cannot find or open ngrok-stable-linux-amd64.zip#, ngrok-stable-linux-amd64.zip#.zip or ngrok-stable-linux-amd64.zip#.ZIP.
sleep: invalid time interval ‘3#’
sleep: invalid time interval ‘Get’
sleep: invalid time interval ‘the’
sleep: invalid time interval ‘address’
sleep: invalid time interval ‘for’
sleep: invalid time interval ‘SSH’
Try 'sleep --help' for more information.
ssh root@https -p //13c5258afc6f.ngrok.io
