## Twitter Gender Classification
### Final Project

#### University of California, Santa Barbara
#### PSTAT 135: Big Data Analytics

Source: https://www.kaggle.com/crowdflower/twitter-user-gender-classification

### Dataset Preprocessing

The dataset contains about 20,000 rows, each with a user name, a random tweet, account profile and image, location, and link and sidebar color. All tweets were posted on October 26, 2015.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("comm") \
        .getOrCreate()

tweets = spark.read.csv('gender_data.csv', header = True)

In [2]:
type(tweets)

pyspark.sql.dataframe.DataFrame

In [3]:
tweets.count()

24230

In [4]:
tweets.columns

['_unit_id',
 '_golden',
 '_unit_state',
 '_trusted_judgments',
 '_last_judgment_at',
 'gender',
 'gender:confidence',
 'profile_yn',
 'profile_yn:confidence',
 'created',
 'description',
 'fav_number',
 'gender_gold',
 'link_color',
 'name',
 'profile_yn_gold',
 'profileimage',
 'retweet_count',
 'sidebar_color',
 'text',
 'tweet_coord',
 'tweet_count',
 'tweet_created',
 'tweet_id',
 'tweet_location',
 'user_timezone']

**We removed columns `_golden`, `_unit_state`, `_last_judgment_at`, `gender:confidence`, `profile_yn`, `profile_yn:confidence`, `link_color`, `profile_yn_gold`, `profileimage`, and `sidebar_color` because they were not relevant in our model/purpose.**

In [5]:
columns_to_drop = ['_unit_id','_golden','_unit_state','_trusted_judgments','_last_judgment_at',
                   'gender:confidence','profile_yn','profile_yn:confidence','gender_gold',
                   'profile_yn_gold','link_color','profileimage','sidebar_color','description','name',
                   'tweet_coord','tweet_location','user_timezone','tweet_id']
tweets = tweets.drop(*columns_to_drop)
tweets.columns

['gender',
 'created',
 'fav_number',
 'retweet_count',
 'text',
 'tweet_count',
 'tweet_created']

**In the `gender` column, we found that there were other labels besides `male`, `female`, and `brand`. Therefore, we are only going to keep rows where they have 1 of these 3 labels.**

In [6]:
tweets = tweets.filter((tweets.gender == 'male') | (tweets.gender == 'female') | (tweets.gender == 'brand'))

In [7]:
tweets.count()

18836

**From above, we see that after filtering gender, we have 18,836 rows. Next, we will check to see that each row has non-empty or non-null values in column `text`.**

In [8]:
# Number of non-null values in "text"
tweets.filter(tweets.text.isNotNull()).count()

17748

We see that we have 17,748 non-null values, so there is a presence of null values in this column. Thus, we will remove rows with null values, since actual text is important in our model.

In [9]:
tweets = tweets.filter(tweets.text.isNotNull())

**For columns `fav_number`, `retweet_count`, and `tweet_count`, we will check to see if there are null values.** 

- If more than 10% of the values are null, we will not include that column later on in our model. 

- If less than 10% of the values are null, we will replace null values with the median of that specific column. We chose median because there is a possibility the mean could be something like 77.5, and you cannot retweet something 77.5 times.

`fav_number` : Since there are no null values, there is no need to do any replacing.

In [10]:
# Number of null values in "fav_number"
tweets.filter(tweets.fav_number.isNull()).count()

0

`retweet_count` : Since there are no null values, there is no need to do any replacing. 

In [11]:
# Number of null values in "retweet_count"
tweets.filter(tweets.retweet_count.isNull()).count()

0

`tweet_count` : There appears to be 1,259 null values. Since null values make up less than 10% of this column, we will replace these null values with the median of this column.

In [12]:
# Number of null values in "tweet_count"
tweets.filter(tweets.tweet_count.isNull()).count()

1259

We applied **Imputer** because it imputes missing values using mean (the default) or median in columns where missing values are located.

In [13]:
# Median of "tweet_count"
from pyspark.ml.feature import Imputer
from pyspark.sql.types import IntegerType

tweets = tweets.withColumn("tweet_count",tweets["tweet_count"].cast(IntegerType()))
imputer = Imputer(inputCols=["tweet_count"], outputCols=["tweet_count_new"]).setStrategy("median")
tweets = imputer.fit(tweets).transform(tweets)

Now we will check to see if there are any null values left in the column.

In [14]:
tweets.filter(tweets.tweet_count_new.isNull()).count()

0

**We are interested in getting the number of years the twitter user has had there account, up to the date of the posted tweet we have in our data. We will create a row with this count of years and call it `account_years`.**

In [15]:
tweets.select('created').show(3)

+--------------+
|       created|
+--------------+
|  12/5/13 1:48|
| 10/1/12 13:51|
|11/28/14 11:30|
+--------------+
only showing top 3 rows



## SHOW THAT ALL TWEETS ARE FROM 2015

In [16]:
# Use rdd, extract only the "year" part of the date, subtract from "15" (representing 2015)
rdd = tweets.rdd
years = rdd.map(lambda row: row['created'].split(' ')) \
                .map(lambda x: x[0]) \
                .map(lambda x: x.split('/')) \
                .map(lambda x: x[2]) \
                .map(lambda x: (15-int(x))) \
                .map(lambda x: (x, )).toDF(['account_years'])

In [17]:
print(years.account_years)
tweets.show(2)

Column<b'account_years'>
+------+-------------+----------+-------------+--------------------+-----------+--------------+---------------+
|gender|      created|fav_number|retweet_count|                text|tweet_count| tweet_created|tweet_count_new|
+------+-------------+----------+-------------+--------------------+-----------+--------------+---------------+
|  male| 12/5/13 1:48|         0|            0|Robbie E Responds...|     110964|10/26/15 12:40|         110964|
|  male|10/1/12 13:51|        68|            0|���It felt like t...|       7471|10/26/15 12:40|           7471|
+------+-------------+----------+-------------+--------------------+-----------+--------------+---------------+
only showing top 2 rows



In [18]:
# Convert back to dataframe, add calculated years to "tweets" as "account_years"

from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

tweets_new=tweets.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
years_new=years.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

tweets_years = tweets_new.join(years_new, on=["row_index"]).drop("row_index")

In [19]:
tweets = tweets_years
tweets.show(3)

+------+--------------+----------+-------------+--------------------+-----------+--------------+---------------+-------------+
|gender|       created|fav_number|retweet_count|                text|tweet_count| tweet_created|tweet_count_new|account_years|
+------+--------------+----------+-------------+--------------------+-----------+--------------+---------------+-------------+
|  male|  12/5/13 1:48|         0|            0|Robbie E Responds...|     110964|10/26/15 12:40|         110964|            2|
|  male| 10/1/12 13:51|        68|            0|���It felt like t...|       7471|10/26/15 12:40|           7471|            3|
|  male|11/28/14 11:30|      7696|            1|i absolutely ador...|       5617|10/26/15 12:40|           5617|            1|
+------+--------------+----------+-------------+--------------------+-----------+--------------+---------------+-------------+
only showing top 3 rows



### Exploratory Data Analysis (EDA)

Checking the counts and frequencies, average number of favorites (`fav_number`), average number of retweets (`retween_count`), average number of tweets (`tweet_count`), and average number of account years for each `gender` output.

In [None]:
# counts
tweets.groupBy('gender').count().orderBy('count',ascending=False).show()

In [None]:
# brand averages
tweets.filter(tweets.gender == "brand") \
            .agg({'fav_number':'avg','retweet_count':'avg','tweet_count':'avg', 'account_years':'avg'}).show()

In [None]:
# female averages
tweets.filter(tweets.gender == "female") \
            .agg({'fav_number':'avg','retweet_count':'avg','tweet_count':'avg', 'account_years':'avg'}).show()

In [None]:
# male averages
tweets.filter(tweets.gender == "male") \
            .agg({'fav_number':'avg','retweet_count':'avg','tweet_count':'avg', 'account_years':'avg'}).show()

### Natural Language Processing (NLP)

#### We derived features based on casing, punctuation, and emojis on the tweets in the `text` column.

**Casing**

In [20]:
# Counts of Lowercase (True), Uppercase (False)

words = rdd.flatMap(lambda row: row['text'].split(' '))
wordcounts = words.map(lambda x: (x, 0)) \
                    .map(lambda x: x[0].isupper())
wordcounts.countByValue()

defaultdict(int, {False: 252068, True: 12995})

In [21]:
12995/(12995+252068)

0.04902608059216111

There are 12,995 upper-case words in the `text` column. As you can see above, upper-case words make up less than 5% of the total words, so we do not think it is a heavily-weighted factor in deciding gender.

**Punctuation**

In [22]:
# Count punctuation (!, ?, .)
from string import punctuation
from collections import Counter

counts = Counter(str(tweets.select('text').take(17748)))
punctuation_counts = {k:v for k, v in counts.items() if k in punctuation}
punctuation_counts

{'[': 96,
 '(': 18409,
 '=': 17774,
 "'": 32906,
 '#': 5809,
 ':': 11900,
 '/': 21656,
 '.': 21515,
 ')': 18566,
 ',': 22674,
 '\\': 430,
 '"': 13310,
 '@': 7858,
 '-': 2266,
 '?': 1707,
 '!': 3546,
 '+': 129,
 '_': 7190,
 '%': 106,
 '&': 679,
 ';': 811,
 ']': 95,
 '|': 132,
 '$': 205,
 '~': 52,
 '*': 291,
 '^': 26,
 '{': 11,
 '}': 8,
 '`': 5}

**Emojis**

In our dataset, emojis are represented by the character '�', so we will see if any of these characters are present in each row tweet. We will add a column titled `emojis`, with the value 0=False (no emojis present) and 1=True (emojis present).

In [23]:
# Emojis
emojis_TF = tweets.select("text").rdd.map(lambda tweet: str(tweet)).map(lambda x: '�' in x).map(lambda x: (x, )).toDF(['emojis'])
emojis_TF.show(3)

+------+
|emojis|
+------+
| false|
|  true|
| false|
+------+
only showing top 3 rows



#### Now we will process the tweets. We can monitor our changes by looking at row 1 of `text`

**Casing**

In [24]:
tweets.select("text").show(1, truncate=False)

+-------------------------------------------------------------------------------------------------------------+
|text                                                                                                         |
+-------------------------------------------------------------------------------------------------------------+
|Robbie E Responds To Critics After Win Against Eddie Edwards In The #WorldTitleSeries https://t.co/NSybBmVjKZ|
+-------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [25]:
# Lowercase words
from pyspark.sql.functions import lower, col
tweets = tweets.withColumn("text",lower(col('text')))
tweets.select("text").show(1, truncate=False)

+-------------------------------------------------------------------------------------------------------------+
|text                                                                                                         |
+-------------------------------------------------------------------------------------------------------------+
|robbie e responds to critics after win against eddie edwards in the #worldtitleseries https://t.co/nsybbmvjkz|
+-------------------------------------------------------------------------------------------------------------+
only showing top 1 row



**Tokenize**

In [26]:
# Tokenizer
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tweets = tokenizer.transform(tweets)
tweets.select("words").show(1, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------+
|[robbie, e, responds, to, critics, after, win, against, eddie, edwards, in, the, #worldtitleseries, https://t.co/nsybbmvjkz]|
+----------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



**Stop Word Removal**

In [27]:
# Stop Word Removal
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tweets = remover.transform(tweets)
tweets.select("filtered").show(1, truncate=False)

+-----------------------------------------------------------------------------------------------+
|filtered                                                                                       |
+-----------------------------------------------------------------------------------------------+
|[robbie, e, responds, critics, win, eddie, edwards, #worldtitleseries, https://t.co/nsybbmvjkz]|
+-----------------------------------------------------------------------------------------------+
only showing top 1 row



**Stemming**

In [28]:
from pyspark.sql.functions import udf
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.types import ArrayType, StringType

stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
stem_tweets = tweets.withColumn("stemmed", stemmer_udf("filtered")).select("stemmed")
stem_tweets.select("stemmed").show(1, truncate=False)

+----------------------------------------------------------------------------------------+
|stemmed                                                                                 |
+----------------------------------------------------------------------------------------+
|[robbi, e, respond, critic, win, eddi, edward, #worldtitleseri, https://t.co/nsybbmvjkz]|
+----------------------------------------------------------------------------------------+
only showing top 1 row



##### HashingTF for stemmed tweet

In [29]:
from pyspark.mllib.feature import HashingTF
tf = HashingTF(numFeatures=10000)
stem_features = stem_tweets.rdd.map(lambda tweet: tf.transform(tweet[0])).map(lambda x: (x, )).toDF(['text_as_vec'])

In [30]:
stem_features.show(5)

+--------------------+
|         text_as_vec|
+--------------------+
|(10000,[162,1521,...|
|(10000,[213,1065,...|
|(10000,[64,2016,2...|
|(10000,[0,997,107...|
|(10000,[956,3905,...|
+--------------------+
only showing top 5 rows



**Bigrams**

In [31]:
from pyspark.ml.feature import NGram
bigram = NGram(n=2, inputCol="stemmed", outputCol="bigrams")
bigram_tweets = bigram.transform(stem_tweets)
bigram_tweets.select("bigrams").show(1, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------+
|bigrams                                                                                                                                 |
+----------------------------------------------------------------------------------------------------------------------------------------+
|[robbi e, e respond, respond critic, critic win, win eddi, eddi edward, edward #worldtitleseri, #worldtitleseri https://t.co/nsybbmvjkz]|
+----------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [62]:
from pyspark.mllib.feature import HashingTF
from pyspark.ml.linalg import Vectors

tf = HashingTF(numFeatures=10000)
bigram_features = bigram_tweets.rdd.map(lambda tweet: tf.transform(tweet[0])).map(lambda x: Vectors.dense(x)).toDF(['bigrams'])
bigram_features.show(3)

KeyboardInterrupt: 

##### Adding columns for counts of punctuation

In [33]:
def punc_func(tweet):
    sum = 0
    for key in tweet:
        if key in punctuation:
            sum += tweet[key]
    return sum

In [34]:
punc = tweets.select("text").rdd.map(lambda tweet: Counter(str(tweet))).map(lambda x: punc_func(x)).map(lambda x: (x, )).toDF(['punc'])
punc.show(5)

+----+
|punc|
+----+
|  11|
|  18|
|   5|
|  19|
|  15|
+----+
only showing top 5 rows



#### Adding Columns to tweets DF

In [53]:
tweets_new = tweets.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
emojis_new = emojis_TF.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
punc_new = punc.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
stem_features_new = stem_features.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
bigram_features_new = bigram_features.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

In [54]:
tweets_final = tweets_new.join(emojis_new, on=["row_index"]).join(punc_new, on=["row_index"]).join(stem_features_new, on=["row_index"]).join(bigram_features_new, on=["row_index"])

In [37]:
tweets_final.take(1)

[Row(row_index=1, gender='male', created='12/5/13 1:48', fav_number='0', retweet_count='0', text='robbie e responds to critics after win against eddie edwards in the #worldtitleseries https://t.co/nsybbmvjkz', tweet_count=110964, tweet_created='10/26/15 12:40', tweet_count_new=110964, account_years=2, words=['robbie', 'e', 'responds', 'to', 'critics', 'after', 'win', 'against', 'eddie', 'edwards', 'in', 'the', '#worldtitleseries', 'https://t.co/nsybbmvjkz'], filtered=['robbie', 'e', 'responds', 'critics', 'win', 'eddie', 'edwards', '#worldtitleseries', 'https://t.co/nsybbmvjkz'], emojis=False, punc=11, text_as_vec=SparseVector(10000, {162: 1.0, 1521: 1.0, 1785: 1.0, 4032: 1.0, 5505: 1.0, 5993: 1.0, 6493: 1.0, 7437: 1.0, 9022: 1.0}), bigrams=SparseVector(10000, {162: 1.0, 1521: 1.0, 1785: 1.0, 4032: 1.0, 5505: 1.0, 5993: 1.0, 6493: 1.0, 7437: 1.0, 9022: 1.0}))]

In [38]:
tweets_final.columns

['row_index',
 'gender',
 'created',
 'fav_number',
 'retweet_count',
 'text',
 'tweet_count',
 'tweet_created',
 'tweet_count_new',
 'account_years',
 'words',
 'filtered',
 'emojis',
 'punc',
 'text_as_vec',
 'bigrams']

#### Now we will apply Vector Assembler to prep the data for various ML models.

##### [[Counts for each word], [Bigrams], [Punctuation counts, ** Emoji Counts **], tweets counts, favorited tweets, retweet counts, account years]

##### Convert gender to numeric type (0 = female, 1 = male, 2 = brand)

In [39]:
import pyspark.sql.functions as F
from functools import reduce

cols = ['emojis']
new_df = reduce(lambda tweets_final, emojis_: tweets_final.withColumn(emojis_, F.when(tweets_final['emojis'] == 'false', 0).otherwise(1)), cols, tweets_final)

In [40]:
new_df.take(1)

[Row(row_index=1, gender='male', created='12/5/13 1:48', fav_number='0', retweet_count='0', text='robbie e responds to critics after win against eddie edwards in the #worldtitleseries https://t.co/nsybbmvjkz', tweet_count=110964, tweet_created='10/26/15 12:40', tweet_count_new=110964, account_years=2, words=['robbie', 'e', 'responds', 'to', 'critics', 'after', 'win', 'against', 'eddie', 'edwards', 'in', 'the', '#worldtitleseries', 'https://t.co/nsybbmvjkz'], filtered=['robbie', 'e', 'responds', 'critics', 'win', 'eddie', 'edwards', '#worldtitleseries', 'https://t.co/nsybbmvjkz'], emojis=0, punc=11, text_as_vec=SparseVector(10000, {162: 1.0, 1521: 1.0, 1785: 1.0, 4032: 1.0, 5505: 1.0, 5993: 1.0, 6493: 1.0, 7437: 1.0, 9022: 1.0}), bigrams=SparseVector(10000, {162: 1.0, 1521: 1.0, 1785: 1.0, 4032: 1.0, 5505: 1.0, 5993: 1.0, 6493: 1.0, 7437: 1.0, 9022: 1.0}))]

In [41]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_num")
final = indexer.fit(tweets_final).transform(tweets_final)

In [42]:
final.select('gender_num').take(6)

[Row(gender_num=1.0),
 Row(gender_num=1.0),
 Row(gender_num=1.0),
 Row(gender_num=1.0),
 Row(gender_num=0.0),
 Row(gender_num=0.0)]

In [43]:
final = final.withColumn("fav_number", final["fav_number"].cast('integer'))
final = final.withColumn("retweet_count", final["retweet_count"].cast('integer'))

In [44]:
final.printSchema

<bound method DataFrame.printSchema of DataFrame[row_index: int, gender: string, created: string, fav_number: int, retweet_count: int, text: string, tweet_count: int, tweet_created: string, tweet_count_new: int, account_years: bigint, words: array<string>, filtered: array<string>, emojis: boolean, punc: bigint, text_as_vec: vector, bigrams: vector, gender_num: double]>

In [61]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['bigrams'],
    outputCol="features")

transformed = assembler.transform(tweets_final.select('*').limit(1000))
transformed.select("features", "gender_num").show(2, truncate=False)

Py4JJavaError: An error occurred while calling o791.transform.
: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:172)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:515)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:188)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:184)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:210)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:100)
	at org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:221)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:192)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:97)
	at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:222)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:483)
	at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:456)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:137)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:97)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:51)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:95)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:39)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:51)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.SortExec.doProduce(SortExec.scala:157)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.SortExec.produce(SortExec.scala:40)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:632)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:692)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:125)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:133)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:132)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:511)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:424)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:511)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:424)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:511)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:424)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:511)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:424)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:434)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2702)
	at org.apache.spark.sql.Dataset.first(Dataset.scala:2709)
	at org.apache.spark.ml.feature.VectorAssembler$.getVectorLengthsFromFirstRow(VectorAssembler.scala:205)
	at org.apache.spark.ml.feature.VectorAssembler$.getLengths(VectorAssembler.scala:231)
	at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:95)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:161)
	... 183 more


In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)