<a href="https://colab.research.google.com/github/j143/notebooks/blob/master/Recommendation_with_Pyspark%2BSystemDS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##### Copyright &copy; 2020 The Apache Software Foundation.

In [1]:
# @title Apache Version 2.0 (The "License");
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------

### Developer notebook for Apache SystemDS

Run this notebook online at [Google Colab ↗](https://colab.research.google.com/github/apache/systemds/blob/master/notebooks/systemds_dev.ipynb).




This Jupyter/Colab-based tutorial will interactively walk through development setup and running SystemDS in both the

A. standalone mode \
B. with Apache Spark.

Flow of the notebook:
1. Download and Install the dependencies
2. Go to section **A** or **B**

#### Download and Install the dependencies

1. **Runtime:** Java (OpenJDK 8 is preferred)
2. **Build:** Apache Maven
3. **Backend:** Apache Spark (optional)

##### Setup

A custom function to run OS commands.

In [7]:
# Run and print a shell command.
def run(command):
  print('>> {}'.format(command))
  !{command}
  print('')

##### Install Java
Let us install OpenJDK 8. More about [OpenJDK ↗](https://openjdk.java.net/install/).

In [30]:
!apt-get update

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
Hit:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Fetched 256 kB in 2s (131 kB/s)
Reading package lists...


In [9]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# run the below command to replace the existing installation
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

!java -version

openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-8u265-b01-0ubuntu2~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)


##### Install Apache Spark (Optional, if you want to work with spark backend)


NOTE: If spark is not downloaded. Let us make sure the version we are trying to download is officially supported at
https://spark.apache.org/downloads.html

In [6]:
# Spark and Hadoop version
spark_version = 'spark-2.4.6'
hadoop_version = 'hadoop2.7'
spark_path = f"/opt/{spark_version}-bin-{hadoop_version}"
if not os.path.exists(spark_path):
  run(f"wget -q -nc -O apache-spark.tgz https://downloads.apache.org/spark/{spark_version}/{spark_version}-bin-{hadoop_version}.tgz")
  run('tar zxf apache-spark.tgz -C /opt')
  run('rm -f apache-spark.tgz')

os.environ["SPARK_HOME"] = spark_path
os.environ["PATH"] += ":$SPARK_HOME/bin"


In [7]:
import os

os.environ["SPARK_HOME"] = spark_path
os.environ["PATH"] += ":$SPARK_HOME/bin"

In [25]:
# Download the maven source.
maven_version = 'apache-maven-3.6.3'
maven_path = f"/opt/{maven_version}"

if not os.path.exists(maven_path):
  run(f"wget -q -nc -O apache-maven.zip https://downloads.apache.org/maven/maven-3/3.6.3/binaries/{maven_version}-bin.zip")
  run('unzip -q -d /opt apache-maven.zip')
  run('rm -f apache-maven.zip')

# Let's choose the absolute path instead of $PATH environment variable.
def maven(args):
  run(f"{maven_path}/bin/mvn {args}")

maven('-v')

>> wget -q -nc -O apache-maven.zip https://downloads.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip

>> unzip -q -d /opt apache-maven.zip

>> rm -f apache-maven.zip

>> /opt/apache-maven-3.6.3/bin/mvn -v
[1mApache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)[m
Maven home: /opt/apache-maven-3.6.3
Java version: 1.8.0_265, vendor: Private Build, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.19.112+", arch: "amd64", family: "unix"



In [3]:
!pip install -q findspark

In [8]:
import findspark
findspark.init()

In [9]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("systemds").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession(sc)

#### Get Apache SystemDS

Apache SystemDS development happens on GitHub at [apache/systemds ↗](https://github.com/apache/systemds)

In [10]:
!git clone https://github.com/apache/systemds systemds --depth=1
%cd systemds

Cloning into 'systemds'...
remote: Enumerating objects: 7642, done.[K
remote: Counting objects: 100% (7642/7642), done.[K
remote: Compressing objects: 100% (4588/4588), done.[K
remote: Total 7642 (delta 5595), reused 3643 (delta 2967), pack-reused 0[K
Receiving objects: 100% (7642/7642), 14.79 MiB | 11.47 MiB/s, done.
Resolving deltas: 100% (5595/5595), done.
/content/systemds


In [31]:
%cd systemds

/content/systemds


In [None]:
!/opt/apache-maven-3.6.3/bin/mvn clean package -P distribution

In [33]:
%cd src/main/python

/content/systemds/src/main/python


In [35]:
!python pre_setup.py

In [None]:
!pip install .

In [22]:
!pip install systemds



In [11]:
%%sh

# Download dataset
curl -O http://snap.stanford.edu/data/amazon0601.txt.gz
gunzip amazon0601.txt.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  1 11.2M    1  203k    0     0   303k      0  0:00:37 --:--:--  0:00:37  303k 14 11.2M   14 1683k    0     0   984k      0  0:00:11  0:00:01  0:00:10  983k 43 11.2M   43 5002k    0     0  1863k      0  0:00:06  0:00:02  0:00:04 1863k 81 11.2M   81 9394k    0     0  2566k      0  0:00:04  0:00:03  0:00:01 2566k100 11.2M  100 11.2M    0     0  2825k      0  0:00:04  0:00:04 --:--:-- 2825k


In [12]:
!sed -n 1,10p amazon0601.txt

# Directed graph (each unordered pair of nodes is saved once): Amazon0601.txt 
# Amazon product co-purchaisng network from June 01 2003
# Nodes: 403394 Edges: 3387388
# FromNodeId	ToNodeId
0	1
0	2
0	3
0	4
0	5
0	6


In [11]:
import pyspark.sql.functions as F


In [19]:
!ls

amazon0601.txt	conf		 dev	 docs	  notebooks  pom.xml	scripts
bin		CONTRIBUTING.md  docker  LICENSE  NOTICE     README.md	src


In [12]:
dataPath = "/content/amazon0601.txt"

X_train = (sc.textFile(dataPath)
    .filter(lambda l: not l.startswith("#"))
    .map(lambda l: l.split("\t"))
    .map(lambda prods: (int(prods[0]), int(prods[1]), 1.0))
    .toDF(("prod_i", "prod_j", "x_ij"))
    .filter("prod_i < 500 AND prod_j < 500") # Filter for memory constraints
    .cache())

In [16]:
X_train

DataFrame[prod_i: bigint, prod_j: bigint, x_ij: double]

In [13]:
# Maximum product id, is 499
max_prod_i = X_train.select(F.max("prod_i")).first()[0]

In [14]:
max_prod_j = X_train.select(F.max("prod_j")).first()[0]

In [15]:
numProducts = max(max_prod_i, max_prod_j) + 1 # since 0-based indexing

In [16]:
print("Total number of products: {}".format(numProducts))

Total number of products: 500


In [1]:
import systemds
from systemds.context import SystemDSContext