# Mic2img Lite
Works on T4 so no Google Colab subscription needed.
You do need an OpenAI and StabilityAI API key.

In [None]:
#@title Check GPU status
!nvidia-smi

In [None]:
#@title Mount Google Drive
from pathlib import Path
import os

drive_mounted = False
gdrive_fpath = '.'
local_path = '/content/'



#@markdown Mounting your google drive is optional.
#@markdown If you mount your drive, the text and image files will be stored on it.

mount_gdrive = True # @param{type:"boolean"}
if mount_gdrive and not drive_mounted:
    from google.colab import drive

    gdrive_mountpoint = '/content/drive/' #@param{type:"string"}
    gdrive_subdirectory = 'MyDrive/mic2img' #@param{type:"string"}
    gdrive_fpath = str(Path(gdrive_mountpoint) / gdrive_subdirectory)
    try:
        drive.mount(gdrive_mountpoint, force_remount = True)
        !mkdir -p {gdrive_fpath}
        %cd {gdrive_fpath}
        local_path = gdrive_fpath
        drive_mounted = True
    except OSError:
        print(
            "If you received an OSError and your drive"
            " was already mounted, ignore it."
            )
        raise

In [None]:
#@title Install dependencies
!pip install openai
!pip install stability-sdk
!pip install keybert
!pip install transformers -q

In [None]:
#@title Install keyword & emotion models
from keybert import KeyBERT
from transformers import RobertaTokenizerFast, TFRobertaForSequenceClassification, pipeline
kw_model = KeyBERT()
tokenizer = RobertaTokenizerFast.from_pretrained("arpanghoshal/EmoRoBERTa")
model = TFRobertaForSequenceClassification.from_pretrained("arpanghoshal/EmoRoBERTa")
emotion = pipeline('text-classification', model='arpanghoshal/EmoRoBERTa', return_all_scores=True)

In [None]:
#@title Global settings

#open_ai_organisation = "Personal" #@param {type:"string"}
open_ai_key = "123456" #@param {type:"string"}
stability_ai_key = "123456" #@param {type:"string"}

prompt_modifier = "drawn illustration"#@param {type:"string"}
directory_name = "test_0"#@param {type:"string"}

amount_of_emotions= 4#@param {type:"slider", min:0, max:10, step:1}

In [None]:
#@title Server related stuff
hostName = "0.0.0.0"
serverPort = 8000


# Python 3 server example
from http.server import BaseHTTPRequestHandler, HTTPServer
from google.colab.output import eval_js
import time
import base64

class MyServer(BaseHTTPRequestHandler):
  def do_GET(self):
    fbase64_string = None
    if os.path.exists(f"{gdrive_fpath}/{directory_name}/latest.png"):
      f = open(f"{gdrive_fpath}/{directory_name}/latest.png", "rb")
      fbase64_string = base64.b64encode(f.read()).decode('ascii')
    self.send_response(200)
    self.send_header("Content-type", "text/html")
    self.end_headers()
    self.wfile.write(bytes("<html style=\"background-color:#000000;text-align:center;\"><head><title>Latest image</title></head>", "utf-8"))
    self.wfile.write(bytes("<body>", "utf-8"))
    if fbase64_string is not None:
      self.wfile.write(bytes("<img style=\"width:auto;height:100%;max-height:vh;\" src=\"data:image/jpeg;base64," + fbase64_string + "\"/>", "utf-8"))
    self.wfile.write(bytes("<script> setTimeout(() => { location.reload() }, 2000) </script>", "utf-8"))
    self.wfile.write(bytes("</body></html>", "utf-8"))

  def log_message(self, format, *args):
    return

print(eval_js("google.colab.kernel.proxyPort(" + str(serverPort) + ")"))

Click this link above to display the images in a new window

Refresh the page after starting the loop below, it should start displaying images within a minute

# Start the loop



In [None]:
# imports
import ipywidgets as widgets
from threading import Thread
from queue import Queue
import openai
from IPython.display import Javascript, display
from google.colab import output
from base64 import b64decode
import time
import subprocess
import os
import signal
import sys
from PIL import Image
from io import BytesIO
import multiprocessing
import stability_sdk.interfaces.gooseai.generation.generation_pb2 as generation
from stability_sdk.client import StabilityInference


RECORD = """
const sleep  = time => new Promise(resolve => setTimeout(resolve, time))
const b2text = blob => new Promise(resolve => {
  const reader = new FileReader()
  reader.onloadend = e => resolve(e.srcElement.result)
  reader.readAsDataURL(blob)
})
var record = time => new Promise(async resolve => {
  stream = await navigator.mediaDevices.getUserMedia({ audio: true })
  recorder = new MediaRecorder(stream)
  chunks = []
  recorder.ondataavailable = e => chunks.push(e.data)
  recorder.start()
  await sleep(time)
  recorder.onstop = async ()=>{
    blob = new Blob(chunks)
    text = await b2text(blob)
    resolve(text)
  }
  recorder.stop()
})
"""

#openai.organization = open_ai_organisation
openai.api_key = open_ai_key
stability_key =  stability_ai_key
counter = int(0)

def record(sec=3):
  display(Javascript(RECORD))
  s = output.eval_js('record(%d)' % (sec*1000))
  b = b64decode(s.split(',')[1])
  return b

messages = Queue()
recordings = Queue()

textblock = widgets.Output()

def prompt_to_image(i, prompt):
    api = StabilityInference(key=stability_key, verbose=True)
    if i > 0:
        prev = Image.open(f"{directory_name}/{(counter-1):04d}.png")
    else:
        prev = None
    answers = api.generate(prompt=prompt, init_image=prev)
    for resp in answers:
        for artifact in resp.artifacts:
            if artifact.type == generation.ARTIFACT_IMAGE:
                image = Image.open(BytesIO(artifact.binary))
                return image
    return None

def speech_recognition(textblock):
  global counter
  while not messages.empty():
    frames = recordings.get()

    if len(frames) > 0:

      if os.path.exists("/content/audio.webm"):
        os.remove("/content/audio.webm")
      if os.path.exists("/content/audio.wav"):
        os.remove("/content/audio.wav")

      with open('/content/audio.webm','wb') as f:
        f.write(frames)
        command = ['ffmpeg', '-i', "/content/audio.webm", "/content/audio.wav"]
        subprocess.run(command,stdout=subprocess.PIPE,stdin=subprocess.PIPE)
      with open('/content/audio.wav', 'rb') as f:
        t = openai.Audio.translate("whisper-1", f)
        try:
          work_text = t["text"]
        except:
          return None
        print(work_text)

        sentence = Prompt(work_text)
        keywords = sentence.extract_keywords()
        emo = sentence.extract_emotions()
        final_prompt = keywords + emo + prompt_modifier
        print(final_prompt)
        image = prompt_to_image(counter, final_prompt)
        if not image:
            return None

        if not os.path.exists(directory_name):
          os.mkdir(directory_name)

        image.save(f"{directory_name}/latest.png")
        image = image.save(f"{directory_name}/{counter:04d}.png")

        with open(f"{directory_name}/recorded.txt", 'a') as f:
          f.write("\n" + work_text)

        display(image)

        counter += 1
        frames = []


class Prompt:

  def __init__(self, sentence):
    self.sentence = sentence

  def extract_keywords(self):
    keyword_list = kw_model.extract_keywords(self.sentence, keyphrase_ngram_range=(1, 1))
    result = [x[0] for x in keyword_list]
    return str(result)

  def extract_emotions(self):
    emotion_labels, = emotion(self.sentence)
    sorted_emotions = sorted(emotion_labels, key=lambda item: item['score'], reverse=True)
    emo_result = []
    for i in sorted_emotions[:amount_of_emotions]:
      val = list(i.values())
      text = (val[0])
      emo_result.append(text)
    return str(emo_result)

def start_recording():
  print('start recording')
  messages.put(True)

  transcribe = Thread(target=speech_recognition, args=(textblock,))
  transcribe.start()

  while True:
    frames = record(30)
    print('finished recording')
    recordings.put(frames)
    time.sleep(0.1)

def start_server():
  webServer = HTTPServer((hostName, serverPort), MyServer)
  print("Server started http://%s:%s" % (hostName, serverPort))

  try:
      webServer.serve_forever()
  except:
      pass

  webServer.server_close()
  print("Server stopped.")

server = multiprocessing.Process(target=start_server)
server.start()

def signal_handler(sig, frame):
  server.terminate()  # sends a SIGTERM
  sys.exit(0)



signal.signal(signal.SIGINT, signal_handler)

start_recording()
display(textblock)
