In [0]:
%fs ls dbfs:/databricks-datasets

In [0]:
# 3 types
# RDDs - Resilient Distributed Dataset
# Data Sets
# Data Frames

In [0]:
spark

In [0]:
sc

In [0]:
sc.textFile?

[0;31mSignature:[0m [0;32mdef[0m [0mtextFile[0m[0;34m([0m[0mname[0m[0;34m:[0m [0mstr[0m[0;34m,[0m [0mminPartitions[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mint[0m[0;34m][0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0muse_unicode[0m[0;34m:[0m [0mbool[0m[0;34m=[0m[0;32mTrue[0m[0;34m)[0m [0;34m->[0m [0mRDD[0m[0;34m[[0m[0mstr[0m[0;34m][0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings. The text files must be encoded as UTF-8.

.. versionadded:: 0.7.0

Parameters
----------
name : str
    directory to the input data files, the path can be comma separated
    paths as a list of inputs
minPartitions : int, optional
    suggested minimum number of partitions for the resulting RDD
use_unicode : bool, default True
    If `use_unicode` is False, the strings will be kept as `str` (encoding
    as `utf-8

In [0]:
lines = sc.textFile('dbfs:/databricks-datasets/SPARK_README.md')

In [0]:
lines.getNumPartitions()

Out[3]: 2

In [0]:
lines = sc.textFile('dbfs:/databricks-datasets/SPARK_README.md',minPartitions=8)

In [0]:
lines.getNumPartitions()

Out[6]: 8

In [0]:
lines = sc.textFile('dbfs:/databricks-datasets/SPARK_README.md')

In [0]:
lines.count()

Out[12]: 95

In [0]:
lines.saveAsTextFile('myoutput')

In [0]:
dbutils.fs.ls(/user)

[0;36m  File [0;32m<command-3500131654022193>:1[0;36m[0m
[0;31m    dbutils.fs.ls(/user)[0m
[0m                  ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
lines = sc.textFile('dbfs:/databricks-datasets/SPARK_README.md')

In [0]:
lines

Out[8]: dbfs:/databricks-datasets/SPARK_README.md MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [0]:
lines.first()

Out[9]: '# Apache Spark'

In [0]:
lines.take(10)

Out[10]: ['# Apache Spark',
 '',
 'Spark is a fast and general cluster computing system for Big Data. It provides',
 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 'supports general computation graphs for data analysis. It also supports a',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 'MLlib for machine learning, GraphX for graph processing,',
 'and Spark Streaming for stream processing.',
 '',
 '<http://spark.apache.org/>']

In [0]:
help(lines.count)

In [0]:
help(lines.filter)

In [0]:
# transformations and actions
# transformations: filter, map, groupByKey, reduceByKey, reduce
# actions: count, collect, first, take, saveAsTextFile

In [0]:
lines.collect()

In [0]:
# narrow transformations: filter, map, flatMap
# wide transformations: groupByKey, reduceByKey, reduce
# shuffling
# DAG - Directed Acyclic Graph
# Lazy Evaluation

In [0]:
help(lines.filter)

Help on method filter in module pyspark.rdd:

filter(f: Callable[[~T], bool]) -> 'RDD[T]' method of pyspark.rdd.RDD instance
    Return a new RDD containing only the elements that satisfy a predicate.
    
    .. versionadded:: 0.7.0
    
    Parameters
    ----------
    f : function
        a function to run on each element of the RDD
    
    Returns
    -------
    :class:`RDD`
        a new :class:`RDD` by applying a function to each element
    
    See Also
    --------
    :meth:`RDD.map`
    
    Examples
    --------
    >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
    >>> rdd.filter(lambda x: x % 2 == 0).collect()
    [2, 4]



In [0]:
lines.filter(lambda line: line != '').take(10)

Out[3]: ['# Apache Spark',
 'Spark is a fast and general cluster computing system for Big Data. It provides',
 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 'supports general computation graphs for data analysis. It also supports a',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 'MLlib for machine learning, GraphX for graph processing,',
 'and Spark Streaming for stream processing.',
 '<http://spark.apache.org/>',
 '## Online Documentation',
 'You can find the latest Spark documentation, including a programming']

In [0]:
lines.filter(lambda line: line != '').count()

Out[4]: 60

In [0]:
help(lines.map)

Help on method map in module pyspark.rdd:

map(f: Callable[[~T], ~U], preservesPartitioning: bool = False) -> 'RDD[U]' method of pyspark.rdd.RDD instance
    Return a new RDD by applying a function to each element of this RDD.
    
    .. versionadded:: 0.7.0
    
    Parameters
    ----------
    f : function
        a function to run on each element of the RDD
    preservesPartitioning : bool, optional, default False
        indicates whether the input function preserves the partitioner,
        which should be False unless this is a pair RDD and the input
    
    Returns
    -------
    :class:`RDD`
        a new :class:`RDD` by applying a function to all elements
    
    See Also
    --------
    :meth:`RDD.flatMap`
    :meth:`RDD.mapPartitions`
    :meth:`RDD.mapPartitionsWithIndex`
    :meth:`RDD.mapPartitionsWithSplit`
    
    Examples
    --------
    >>> rdd = sc.parallelize(["b", "a", "c"])
    >>> sorted(rdd.map(lambda x: (x, 1)).collect())
    [('a', 1), ('b', 1), ('c', 

In [0]:
lines.take(10)

Out[6]: ['# Apache Spark',
 '',
 'Spark is a fast and general cluster computing system for Big Data. It provides',
 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 'supports general computation graphs for data analysis. It also supports a',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 'MLlib for machine learning, GraphX for graph processing,',
 'and Spark Streaming for stream processing.',
 '',
 '<http://spark.apache.org/>']

In [0]:
lines.map(lambda line: line.upper()).take(10)

Out[7]: ['# APACHE SPARK',
 '',
 'SPARK IS A FAST AND GENERAL CLUSTER COMPUTING SYSTEM FOR BIG DATA. IT PROVIDES',
 'HIGH-LEVEL APIS IN SCALA, JAVA, PYTHON, AND R, AND AN OPTIMIZED ENGINE THAT',
 'SUPPORTS GENERAL COMPUTATION GRAPHS FOR DATA ANALYSIS. IT ALSO SUPPORTS A',
 'RICH SET OF HIGHER-LEVEL TOOLS INCLUDING SPARK SQL FOR SQL AND DATAFRAMES,',
 'MLLIB FOR MACHINE LEARNING, GRAPHX FOR GRAPH PROCESSING,',
 'AND SPARK STREAMING FOR STREAM PROCESSING.',
 '',
 '<HTTP://SPARK.APACHE.ORG/>']

In [0]:
lines.map(lambda line: line.upper()).count()

Out[8]: 95

In [0]:
lines. \
    filter(lambda line: line != ''). \
    map(lambda line: line.upper()). \
    take(10)

Out[9]: ['# APACHE SPARK',
 'SPARK IS A FAST AND GENERAL CLUSTER COMPUTING SYSTEM FOR BIG DATA. IT PROVIDES',
 'HIGH-LEVEL APIS IN SCALA, JAVA, PYTHON, AND R, AND AN OPTIMIZED ENGINE THAT',
 'SUPPORTS GENERAL COMPUTATION GRAPHS FOR DATA ANALYSIS. IT ALSO SUPPORTS A',
 'RICH SET OF HIGHER-LEVEL TOOLS INCLUDING SPARK SQL FOR SQL AND DATAFRAMES,',
 'MLLIB FOR MACHINE LEARNING, GRAPHX FOR GRAPH PROCESSING,',
 'AND SPARK STREAMING FOR STREAM PROCESSING.',
 '<HTTP://SPARK.APACHE.ORG/>',
 '## ONLINE DOCUMENTATION',
 'YOU CAN FIND THE LATEST SPARK DOCUMENTATION, INCLUDING A PROGRAMMING']

In [0]:
s = 'and Spark Streaming for stream processing.'

In [0]:
type(s)

In [0]:
s.upper()

In [0]:
s.lower()

In [0]:
len(s)

In [0]:
s.lower().count('stream')

In [0]:
s.lower().count('s')

In [0]:
help(s.split)

In [0]:
s

In [0]:
s.split(' ')

In [0]:
help(lines.reduce)

In [0]:
lines.collect()

In [0]:
lines.map(lambda line: len(line)).take(10)

In [0]:
from operator import add

In [0]:
lines.map(lambda line: len(line)).reduce(add)

In [0]:
lines.map(lambda line: line.split(' ')).take(10)

In [0]:
help(lines.flatMap)

In [0]:
lines. \
    filter(lambda line: line != ''). \
    flatMap(lambda line: line.split(' ')). \
    take(10)

In [0]:
lines.count()

In [0]:
lines. \
    filter(lambda line: line != ''). \
    count()

In [0]:
lines. \
    filter(lambda line: line != ''). \
    flatMap(lambda line: line.split(' ')). \
    count()

In [0]:
help(lines.reduceByKey)

In [0]:
words = lines. \
    filter(lambda line: line != ''). \
    flatMap(lambda line: line.split(' '))

In [0]:
words.take(10)

In [0]:
words.map(lambda word: (word, 1)).take(10)

In [0]:
word_tuples = words.map(lambda word: (word.upper(), 1))

In [0]:
from operator import add
word_tuples. \
    reduceByKey(add). \
    take(10)

In [0]:
word_tuples. \
    reduceByKey(add). \
    count()

In [0]:
word_tuples. \
    reduceByKey(add). \
    collect()

In [0]:
word_count = word_tuples. \
    reduceByKey(add)

In [0]:
word_count.sortByKey?

In [0]:
word_count. \
    sortByKey(). \
    collect()

In [0]:
word_count. \
    map(lambda i: (i[1], i[0])). \
    sortByKey(False). \
    collect()

In [0]:
word_count. \
    map(lambda i: (i[1], i[0])). \
    sortByKey(False). \
    map(lambda i: (i[1], i[0])). \
    collect()

In [0]:
t = (22, 'THE')

In [0]:
(t[1], t[0])

In [0]:
f"{t[1]},{t[0]}"

In [0]:
wc_sorted = word_count. \
    map(lambda i: (i[1], i[0])). \
    sortByKey(False). \
    map(lambda i: f"{i[1]},{i[0]}")

In [0]:
wc_sorted.collect()

In [0]:
wc_sorted.saveAsTextFile?

In [0]:
wc_sorted.collect()

In [0]:

wc_sorted.saveAsTextFile('dbfs:/data/word_count')

In [0]:
output_df = sc.textFile('dbfs:/data/word_count')

In [0]:
output_df.take(10)